MINOR: Remove unused ShutdownableThread class and ineffective ThreadedTest class (#12410)

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Christo Lolov <christo_lolov@yahoo.com>
This commit is contained in:
Chris Egerton 2022-08-18 06:08:03 -04:00 committed by GitHub
parent 04fce135d3
commit 73e8d5dd5b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 19 additions and 281 deletions

View File

@ -1,144 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* <p>
* Thread class with support for triggering graceful and forcible shutdown. In graceful shutdown,
* a flag is set, which the thread should detect and try to exit gracefully from. In forcible
* shutdown, the thread is interrupted. These can be combined to give a thread a chance to exit
* gracefully, but then force it to exit if it takes too long.
* </p>
* <p>
* Implementations should override the {@link #execute} method and check {@link #getRunning} to
* determine whether they should try to gracefully exit.
* </p>
*/
public abstract class ShutdownableThread extends Thread {
private static final Logger log = LoggerFactory.getLogger(ShutdownableThread.class);
private final AtomicBoolean isRunning = new AtomicBoolean(true);
private final CountDownLatch shutdownLatch = new CountDownLatch(1);
/**
* An UncaughtExceptionHandler to register on every instance of this class. This is useful for
* testing, where AssertionExceptions in the thread may not cause the test to fail. Since one
* instance is used for all threads, it must be thread-safe.
*/
volatile public static UncaughtExceptionHandler funcaughtExceptionHandler = null;
public ShutdownableThread(String name) {
// The default is daemon=true so that these threads will not prevent shutdown. We use this
// default because threads that are running user code that may not clean up properly, even
// when we attempt to forcibly shut them down.
this(name, true);
}
public ShutdownableThread(String name, boolean daemon) {
super(name);
this.setDaemon(daemon);
if (funcaughtExceptionHandler != null)
this.setUncaughtExceptionHandler(funcaughtExceptionHandler);
}
/**
* Implementations should override this method with the main body for the thread.
*/
public abstract void execute();
/**
* Returns true if the thread hasn't exited yet and none of the shutdown methods have been
* invoked
*/
public boolean getRunning() {
return isRunning.get();
}
@Override
public void run() {
try {
execute();
} catch (Error | RuntimeException e) {
log.error("Thread {} exiting with uncaught exception: ", getName(), e);
throw e;
} finally {
shutdownLatch.countDown();
}
}
/**
* Shutdown the thread, first trying to shut down gracefully using the specified timeout, then
* forcibly interrupting the thread.
* @param gracefulTimeout the maximum time to wait for a graceful exit
* @param unit the time unit of the timeout argument
*/
public void shutdown(long gracefulTimeout, TimeUnit unit)
throws InterruptedException {
boolean success = gracefulShutdown(gracefulTimeout, unit);
if (!success)
forceShutdown();
}
/**
* Attempt graceful shutdown
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return true if successful, false if the timeout elapsed
*/
public boolean gracefulShutdown(long timeout, TimeUnit unit) throws InterruptedException {
startGracefulShutdown();
return awaitShutdown(timeout, unit);
}
/**
* Start shutting down this thread gracefully, but do not block waiting for it to exit.
*/
public void startGracefulShutdown() {
log.info("Starting graceful shutdown of thread {}", getName());
isRunning.set(false);
}
/**
* Awaits shutdown of this thread, waiting up to the timeout.
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return true if successful, false if the timeout elapsed
* @throws InterruptedException
*/
public boolean awaitShutdown(long timeout, TimeUnit unit) throws InterruptedException {
return shutdownLatch.await(timeout, unit);
}
/**
* Immediately tries to force the thread to shut down by interrupting it. This does not try to
* wait for the thread to truly exit because forcible shutdown is not always possible. By
* default, threads are marked as daemon threads so they will not prevent the process from
* exiting.
*/
public void forceShutdown() {
log.info("Forcing shutdown of thread {}", getName());
isRunning.set(false);
interrupt();
}
}

View File

@ -52,7 +52,6 @@ import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.ParameterizedTest;
import org.apache.kafka.connect.util.ThreadedTest;
import org.apache.kafka.connect.util.TopicAdmin;
import org.apache.kafka.connect.util.TopicCreationGroup;
import org.easymock.Capture;
@ -60,6 +59,7 @@ import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.easymock.IExpectationSetters;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
@ -110,7 +110,7 @@ import static org.junit.Assert.fail;
"org.apache.log4j.*"})
@RunWith(PowerMockRunner.class)
@PowerMockRunnerDelegate(ParameterizedTest.class)
public class ExactlyOnceWorkerSourceTaskTest extends ThreadedTest {
public class ExactlyOnceWorkerSourceTaskTest {
private static final String TOPIC = "topic";
private static final Map<String, byte[]> PARTITION = Collections.singletonMap("key", "partition".getBytes());
private static final Map<String, Integer> OFFSET = Collections.singletonMap("key", 12);
@ -175,9 +175,8 @@ public class ExactlyOnceWorkerSourceTaskTest extends ThreadedTest {
this.enableTopicCreation = enableTopicCreation;
}
@Override
@Before
public void setup() {
super.setup();
Map<String, String> workerProps = workerProps();
plugins = new Plugins(workerProps);
config = new StandaloneConfig(workerProps);

View File

@ -19,9 +19,9 @@ package org.apache.kafka.connect.runtime;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.ThreadedTest;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
@ -46,7 +46,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@RunWith(PowerMockRunner.class)
public class SourceTaskOffsetCommitterTest extends ThreadedTest {
public class SourceTaskOffsetCommitterTest {
private final ConcurrentHashMap<ConnectorTaskId, ScheduledFuture<?>> committers = new ConcurrentHashMap<>();
@ -61,9 +61,8 @@ public class SourceTaskOffsetCommitterTest extends ThreadedTest {
private static final long DEFAULT_OFFSET_COMMIT_INTERVAL_MS = 1000;
@Override
@Before
public void setup() {
super.setup();
Map<String, String> workerProps = new HashMap<>();
workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");

View File

@ -41,12 +41,12 @@ import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.connect.util.ThreadedTest;
import org.easymock.Capture;
import org.easymock.CaptureType;
import org.easymock.EasyMock;
import org.easymock.IExpectationSetters;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
@ -74,7 +74,7 @@ import static org.junit.Assert.fail;
@RunWith(PowerMockRunner.class)
@PrepareForTest(WorkerSinkTask.class)
@PowerMockIgnore("javax.management.*")
public class WorkerSinkTaskThreadedTest extends ThreadedTest {
public class WorkerSinkTaskThreadedTest {
// These are fixed to keep this code simpler. In this example we assume byte[] raw values
// with mix of integer/string in Connect
@ -129,9 +129,8 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
private long recordsReturned;
@Override
@Before
public void setup() {
super.setup();
time = new MockTime();
metrics = new MockConnectMetrics();
Map<String, String> workerProps = new HashMap<>();

View File

@ -50,7 +50,6 @@ import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.ParameterizedTest;
import org.apache.kafka.connect.util.ThreadedTest;
import org.apache.kafka.connect.util.TopicAdmin;
import org.apache.kafka.connect.util.TopicCreationGroup;
import org.easymock.Capture;
@ -58,6 +57,7 @@ import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.easymock.IExpectationSetters;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
@ -106,7 +106,7 @@ import static org.junit.Assert.assertTrue;
"org.apache.log4j.*"})
@RunWith(PowerMockRunner.class)
@PowerMockRunnerDelegate(ParameterizedTest.class)
public class WorkerSourceTaskTest extends ThreadedTest {
public class WorkerSourceTaskTest {
private static final String TOPIC = "topic";
private static final String OTHER_TOPIC = "other-topic";
private static final Map<String, Object> PARTITION = Collections.singletonMap("key", "partition".getBytes());
@ -168,9 +168,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
this.enableTopicCreation = enableTopicCreation;
}
@Override
@Before
public void setup() {
super.setup();
Map<String, String> workerProps = workerProps();
plugins = new Plugins(workerProps);
config = new StandaloneConfig(workerProps);

View File

@ -63,7 +63,6 @@ import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.FutureCallback;
import org.apache.kafka.connect.util.ParameterizedTest;
import org.apache.kafka.connect.util.ThreadedTest;
import org.apache.kafka.connect.util.TopicAdmin;
import org.junit.After;
import org.junit.Before;
@ -141,7 +140,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
@RunWith(Parameterized.class)
public class WorkerTest extends ThreadedTest {
public class WorkerTest {
private static final String CONNECTOR_ID = "test-connector";
private static final ConnectorTaskId TASK_ID = new ConnectorTaskId("job", 0);
@ -217,8 +216,6 @@ public class WorkerTest extends ThreadedTest {
@Before
public void setup() {
super.setup();
// Use strict mode to detect unused mocks
mockitoSession = Mockito.mockitoSession()
.initMocks(this)

View File

@ -66,7 +66,6 @@ import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.FutureCallback;
import org.apache.kafka.connect.util.ThreadedTest;
import org.easymock.Capture;
import org.easymock.CaptureType;
import org.easymock.EasyMock;
@ -96,6 +95,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -125,7 +125,7 @@ import static org.junit.Assert.fail;
@RunWith(PowerMockRunner.class)
@PrepareForTest({DistributedHerder.class, Plugins.class, RestClient.class})
@PowerMockIgnore({"javax.management.*", "javax.crypto.*"})
public class DistributedHerderTest extends ThreadedTest {
public class DistributedHerderTest {
private static final Map<String, String> HERDER_CONFIG = new HashMap<>();
static {
HERDER_CONFIG.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "status-topic");
@ -220,6 +220,7 @@ public class DistributedHerderTest extends ThreadedTest {
private ConfigBackingStore.UpdateListener configUpdateListener;
private WorkerRebalanceListener rebalanceListener;
private ExecutorService herderExecutor;
private Future<?> herderFuture;
private SinkConnectorConfig conn1SinkConfig;
private SinkConnectorConfig conn1SinkConfigUpdated;
@ -3654,13 +3655,14 @@ public class DistributedHerderTest extends ThreadedTest {
private void startBackgroundHerder() {
herderExecutor = Executors.newSingleThreadExecutor();
herderExecutor.submit(herder);
herderFuture = herderExecutor.submit(herder);
}
private void stopBackgroundHerder() throws Exception {
herder.stop();
herderExecutor.shutdown();
herderExecutor.awaitTermination(10, TimeUnit.SECONDS);
assertTrue("herder thread did not finish in time", herderExecutor.awaitTermination(10, TimeUnit.SECONDS));
herderFuture.get();
}
private void expectHerderStartup() {

View File

@ -1,71 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.util;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class ShutdownableThreadTest {
@Test
public void testGracefulShutdown() throws InterruptedException {
ShutdownableThread thread = new ShutdownableThread("graceful") {
@Override
public void execute() {
while (getRunning()) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
// Ignore
}
}
}
};
thread.start();
Thread.sleep(10);
assertTrue(thread.gracefulShutdown(1000, TimeUnit.MILLISECONDS));
}
@Test
public void testForcibleShutdown() throws InterruptedException {
final CountDownLatch startedLatch = new CountDownLatch(1);
ShutdownableThread thread = new ShutdownableThread("forcible") {
@Override
public void execute() {
try {
startedLatch.countDown();
Thread.sleep(100000);
} catch (InterruptedException e) {
// Ignore
}
}
};
thread.start();
startedLatch.await();
thread.forceShutdown();
// Not all threads can be forcibly stopped since interrupt() doesn't work on threads in
// certain conditions, but in this case we know the thread is interruptible so we should be
// able join() it
thread.join(1000);
assertFalse(thread.isAlive());
}
}

View File

@ -1,42 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.util;
import org.junit.After;
import org.junit.Before;
/**
* Base class for tests that use threads. It sets up uncaught exception handlers for all known
* thread classes and checks for errors at the end of the test so that failures in background
* threads will cause the test to fail.
*/
public class ThreadedTest {
protected TestBackgroundThreadExceptionHandler backgroundThreadExceptionHandler;
@Before
public void setup() {
backgroundThreadExceptionHandler = new TestBackgroundThreadExceptionHandler();
ShutdownableThread.funcaughtExceptionHandler = backgroundThreadExceptionHandler;
}
@After
public void teardown() {
backgroundThreadExceptionHandler.verifyNoExceptions();
ShutdownableThread.funcaughtExceptionHandler = null;
}
}