mirror of https://github.com/apache/kafka.git
KAFKA-8058: Fix ConnectClusterStateImpl.connectors() method (#6384)
Fixed the ConnectClusterStateImpl.connectors() method and throw an exception on timeout. Added unit test. Author: Chris Egerton <chrise@confluent.io> Reviewers: Magesh Nandakumar <magesh.n.kumar@gmail.com>, Robert Yokota <rayokota@gmail.com>, Arjun Satish <wicknicks@users.noreply.github.com>, Konstantine Karantasis <konstantine@confluent.io>, Randall Hauch <rhauch@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io> Closes #6384 from C0urante:kafka-8058
This commit is contained in:
parent
db338ef67c
commit
71e721f135
|
@ -17,6 +17,7 @@
|
|||
|
||||
package org.apache.kafka.connect.runtime.health;
|
||||
|
||||
import org.apache.kafka.connect.errors.ConnectException;
|
||||
import org.apache.kafka.connect.health.ConnectClusterState;
|
||||
import org.apache.kafka.connect.health.ConnectorHealth;
|
||||
import org.apache.kafka.connect.health.ConnectorState;
|
||||
|
@ -24,32 +25,35 @@ import org.apache.kafka.connect.health.ConnectorType;
|
|||
import org.apache.kafka.connect.health.TaskState;
|
||||
import org.apache.kafka.connect.runtime.HerderProvider;
|
||||
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
|
||||
import org.apache.kafka.connect.util.Callback;
|
||||
import org.apache.kafka.connect.util.FutureCallback;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
public class ConnectClusterStateImpl implements ConnectClusterState {
|
||||
|
||||
private HerderProvider herderProvider;
|
||||
private final long herderRequestTimeoutMs;
|
||||
private final HerderProvider herderProvider;
|
||||
|
||||
public ConnectClusterStateImpl(HerderProvider herderProvider) {
|
||||
public ConnectClusterStateImpl(long connectorsTimeoutMs, HerderProvider herderProvider) {
|
||||
this.herderRequestTimeoutMs = connectorsTimeoutMs;
|
||||
this.herderProvider = herderProvider;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<String> connectors() {
|
||||
final Collection<String> connectors = new ArrayList<>();
|
||||
herderProvider.get().connectors(new Callback<java.util.Collection<String>>() {
|
||||
@Override
|
||||
public void onCompletion(Throwable error, Collection<String> result) {
|
||||
connectors.addAll(result);
|
||||
}
|
||||
});
|
||||
return connectors;
|
||||
FutureCallback<Collection<String>> connectorsCallback = new FutureCallback<>();
|
||||
herderProvider.get().connectors(connectorsCallback);
|
||||
try {
|
||||
return connectorsCallback.get(herderRequestTimeoutMs, TimeUnit.MILLISECONDS);
|
||||
} catch (InterruptedException | ExecutionException | TimeoutException e) {
|
||||
throw new ConnectException("Failed to retrieve list of connectors", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.kafka.connect.rest.ConnectRestExtension;
|
|||
import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
|
||||
import org.apache.kafka.connect.runtime.HerderProvider;
|
||||
import org.apache.kafka.connect.runtime.WorkerConfig;
|
||||
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
|
||||
import org.apache.kafka.connect.runtime.health.ConnectClusterStateImpl;
|
||||
import org.apache.kafka.connect.runtime.isolation.Plugins;
|
||||
import org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper;
|
||||
|
@ -308,10 +309,16 @@ public class RestServer {
|
|||
config.getList(WorkerConfig.REST_EXTENSION_CLASSES_CONFIG),
|
||||
config, ConnectRestExtension.class);
|
||||
|
||||
long herderRequestTimeoutMs = ConnectorsResource.REQUEST_TIMEOUT_MS;
|
||||
Integer rebalanceTimeoutMs = config.getInt(DistributedConfig.REBALANCE_TIMEOUT_MS_CONFIG);
|
||||
if (rebalanceTimeoutMs != null) {
|
||||
herderRequestTimeoutMs = Math.min(herderRequestTimeoutMs, rebalanceTimeoutMs.longValue());
|
||||
}
|
||||
|
||||
ConnectRestExtensionContext connectRestExtensionContext =
|
||||
new ConnectRestExtensionContextImpl(
|
||||
new ConnectRestConfigurable(resourceConfig),
|
||||
new ConnectClusterStateImpl(provider)
|
||||
new ConnectClusterStateImpl(herderRequestTimeoutMs, provider)
|
||||
);
|
||||
for (ConnectRestExtension connectRestExtension : connectRestExtensions) {
|
||||
connectRestExtension.register(connectRestExtensionContext);
|
||||
|
|
|
@ -66,7 +66,7 @@ public class ConnectorsResource {
|
|||
// session timeout to complete, during which we cannot serve some requests. Ideally we could reduce this, but
|
||||
// we need to consider all possible scenarios this could fail. It might be ok to fail with a timeout in rare cases,
|
||||
// but currently a worker simply leaving the group can take this long as well.
|
||||
private static final long REQUEST_TIMEOUT_MS = 90 * 1000;
|
||||
public static final long REQUEST_TIMEOUT_MS = 90 * 1000;
|
||||
|
||||
private final HerderProvider herderProvider;
|
||||
private final WorkerConfig config;
|
||||
|
|
|
@ -0,0 +1,87 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.connect.runtime.health;
|
||||
|
||||
import org.apache.kafka.connect.errors.ConnectException;
|
||||
import org.apache.kafka.connect.runtime.Herder;
|
||||
import org.apache.kafka.connect.runtime.HerderProvider;
|
||||
import org.apache.kafka.connect.util.Callback;
|
||||
import org.easymock.Capture;
|
||||
import org.easymock.EasyMock;
|
||||
import org.easymock.IAnswer;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.powermock.api.easymock.annotation.Mock;
|
||||
import org.powermock.modules.junit4.PowerMockRunner;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertThrows;
|
||||
|
||||
@RunWith(PowerMockRunner.class)
|
||||
public class ConnectClusterStateImplTest {
|
||||
|
||||
@Mock
|
||||
protected Herder herder;
|
||||
protected HerderProvider herderProvider;
|
||||
protected ConnectClusterStateImpl connectClusterState;
|
||||
protected Collection<String> expectedConnectors;
|
||||
protected long herderRequestTimeoutMs = TimeUnit.SECONDS.toMillis(10);
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
herderProvider = new HerderProvider(herder);
|
||||
connectClusterState = new ConnectClusterStateImpl(herderRequestTimeoutMs, herderProvider);
|
||||
expectedConnectors = Arrays.asList("sink1", "source1", "source2");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void connectors() {
|
||||
Capture<Callback<Collection<String>>> callback = EasyMock.newCapture();
|
||||
herder.connectors(EasyMock.capture(callback));
|
||||
EasyMock.expectLastCall().andAnswer(new IAnswer<Void>() {
|
||||
@Override
|
||||
public Void answer() {
|
||||
callback.getValue().onCompletion(null, expectedConnectors);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
EasyMock.replay(herder);
|
||||
assertEquals(expectedConnectors, connectClusterState.connectors());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void connectorsFailure() {
|
||||
Capture<Callback<Collection<String>>> callback = EasyMock.newCapture();
|
||||
herder.connectors(EasyMock.capture(callback));
|
||||
EasyMock.expectLastCall().andAnswer(new IAnswer<Void>() {
|
||||
@Override
|
||||
public Void answer() {
|
||||
Throwable timeout = new TimeoutException();
|
||||
callback.getValue().onCompletion(timeout, null);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
EasyMock.replay(herder);
|
||||
assertThrows(ConnectException.class, connectClusterState::connectors);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue