mirror of https://github.com/apache/kafka.git
KAFKA-5562; execute state dir cleanup on single thread
Use a single `StateDirectory` per streams instance. Use threadId to determine which thread owns the lock. Only allow the owning thread to unlock. Execute cleanup on a scheduled thread in `KafkaStreams` Author: Damian Guy <damian.guy@gmail.com> Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bbejeck@gmail.com>, Guozhang Wang <wangguoz@gmail.com> Closes #3516 from dguy/kafka-5562
This commit is contained in:
parent
89faed8d30
commit
9f3f8b4de6
|
@ -857,6 +857,7 @@ project(':streams') {
|
||||||
testCompile project(':core')
|
testCompile project(':core')
|
||||||
testCompile project(':core').sourceSets.test.output
|
testCompile project(':core').sourceSets.test.output
|
||||||
testCompile libs.junit
|
testCompile libs.junit
|
||||||
|
testCompile libs.easymock
|
||||||
|
|
||||||
testRuntime libs.slf4jlog4j
|
testRuntime libs.slf4jlog4j
|
||||||
}
|
}
|
||||||
|
|
|
@ -57,7 +57,6 @@ import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
@ -70,6 +69,9 @@ import java.util.Map;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.ThreadFactory;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import static org.apache.kafka.common.utils.Utils.getHost;
|
import static org.apache.kafka.common.utils.Utils.getHost;
|
||||||
|
@ -128,6 +130,7 @@ public class KafkaStreams {
|
||||||
private static final int DEFAULT_CLOSE_TIMEOUT = 0;
|
private static final int DEFAULT_CLOSE_TIMEOUT = 0;
|
||||||
private GlobalStreamThread globalStreamThread;
|
private GlobalStreamThread globalStreamThread;
|
||||||
|
|
||||||
|
private final ScheduledExecutorService stateDirCleaner;
|
||||||
private final StreamThread[] threads;
|
private final StreamThread[] threads;
|
||||||
private final Metrics metrics;
|
private final Metrics metrics;
|
||||||
private final QueryableStoreProvider queryableStoreProvider;
|
private final QueryableStoreProvider queryableStoreProvider;
|
||||||
|
@ -140,6 +143,7 @@ public class KafkaStreams {
|
||||||
private final String logPrefix;
|
private final String logPrefix;
|
||||||
private final StreamsMetadataState streamsMetadataState;
|
private final StreamsMetadataState streamsMetadataState;
|
||||||
private final StreamsConfig config;
|
private final StreamsConfig config;
|
||||||
|
private final StateDirectory stateDirectory;
|
||||||
|
|
||||||
// container states
|
// container states
|
||||||
/**
|
/**
|
||||||
|
@ -471,12 +475,13 @@ public class KafkaStreams {
|
||||||
final long cacheSizeBytes = Math.max(0, config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) /
|
final long cacheSizeBytes = Math.max(0, config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) /
|
||||||
(config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG) + (globalTaskTopology == null ? 0 : 1)));
|
(config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG) + (globalTaskTopology == null ? 0 : 1)));
|
||||||
|
|
||||||
|
stateDirectory = new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG), time);
|
||||||
if (globalTaskTopology != null) {
|
if (globalTaskTopology != null) {
|
||||||
final String globalThreadId = clientId + "-GlobalStreamThread";
|
final String globalThreadId = clientId + "-GlobalStreamThread";
|
||||||
globalStreamThread = new GlobalStreamThread(globalTaskTopology,
|
globalStreamThread = new GlobalStreamThread(globalTaskTopology,
|
||||||
config,
|
config,
|
||||||
clientSupplier.getRestoreConsumer(config.getRestoreConsumerConfigs(clientId + "-global")),
|
clientSupplier.getRestoreConsumer(config.getRestoreConsumerConfigs(clientId + "-global")),
|
||||||
new StateDirectory(applicationId, globalThreadId, config.getString(StreamsConfig.STATE_DIR_CONFIG), time),
|
stateDirectory,
|
||||||
metrics,
|
metrics,
|
||||||
time,
|
time,
|
||||||
globalThreadId);
|
globalThreadId);
|
||||||
|
@ -493,7 +498,8 @@ public class KafkaStreams {
|
||||||
metrics,
|
metrics,
|
||||||
time,
|
time,
|
||||||
streamsMetadataState,
|
streamsMetadataState,
|
||||||
cacheSizeBytes);
|
cacheSizeBytes,
|
||||||
|
stateDirectory);
|
||||||
threadState.put(threads[i].getId(), threads[i].state());
|
threadState.put(threads[i].getId(), threads[i].state());
|
||||||
storeProviders.add(new StreamThreadStateStoreProvider(threads[i]));
|
storeProviders.add(new StreamThreadStateStoreProvider(threads[i]));
|
||||||
}
|
}
|
||||||
|
@ -507,6 +513,15 @@ public class KafkaStreams {
|
||||||
|
|
||||||
final GlobalStateStoreProvider globalStateStoreProvider = new GlobalStateStoreProvider(builder.globalStateStores());
|
final GlobalStateStoreProvider globalStateStoreProvider = new GlobalStateStoreProvider(builder.globalStateStores());
|
||||||
queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
|
queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
|
||||||
|
final String cleanupThreadName = clientId + "-CleanupThread";
|
||||||
|
stateDirCleaner = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
|
||||||
|
@Override
|
||||||
|
public Thread newThread(final Runnable r) {
|
||||||
|
final Thread thread = new Thread(r, cleanupThreadName);
|
||||||
|
thread.setDaemon(true);
|
||||||
|
return thread;
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private static HostInfo parseHostInfo(final String endPoint) {
|
private static HostInfo parseHostInfo(final String endPoint) {
|
||||||
|
@ -577,6 +592,18 @@ public class KafkaStreams {
|
||||||
thread.start();
|
thread.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final Long cleanupDelay = config.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG);
|
||||||
|
stateDirCleaner.scheduleAtFixedRate(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
synchronized (stateLock) {
|
||||||
|
if (state == State.RUNNING) {
|
||||||
|
stateDirectory.cleanRemovedTasks(cleanupDelay);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, cleanupDelay, cleanupDelay, TimeUnit.MILLISECONDS);
|
||||||
|
|
||||||
log.info("{} Started Kafka Stream process", logPrefix);
|
log.info("{} Started Kafka Stream process", logPrefix);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -631,6 +658,7 @@ public class KafkaStreams {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
stateDirCleaner.shutdownNow();
|
||||||
// save the current thread so that if it is a stream thread
|
// save the current thread so that if it is a stream thread
|
||||||
// we don't attempt to join it and cause a deadlock
|
// we don't attempt to join it and cause a deadlock
|
||||||
final Thread shutdown = new Thread(new Runnable() {
|
final Thread shutdown = new Thread(new Runnable() {
|
||||||
|
@ -724,17 +752,6 @@ public class KafkaStreams {
|
||||||
if (isRunning()) {
|
if (isRunning()) {
|
||||||
throw new IllegalStateException("Cannot clean up while running.");
|
throw new IllegalStateException("Cannot clean up while running.");
|
||||||
}
|
}
|
||||||
|
|
||||||
final String appId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
|
|
||||||
final String stateDir = config.getString(StreamsConfig.STATE_DIR_CONFIG);
|
|
||||||
|
|
||||||
final String localApplicationDir = stateDir + File.separator + appId;
|
|
||||||
log.debug("{} Removing local Kafka Streams application data in {} for application {}.",
|
|
||||||
logPrefix,
|
|
||||||
localApplicationDir,
|
|
||||||
appId);
|
|
||||||
|
|
||||||
final StateDirectory stateDirectory = new StateDirectory(appId, "cleanup", stateDir, Time.SYSTEM);
|
|
||||||
stateDirectory.cleanRemovedTasks(0);
|
stateDirectory.cleanRemovedTasks(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -45,21 +45,25 @@ public class StateDirectory {
|
||||||
private static final Logger log = LoggerFactory.getLogger(StateDirectory.class);
|
private static final Logger log = LoggerFactory.getLogger(StateDirectory.class);
|
||||||
|
|
||||||
private final File stateDir;
|
private final File stateDir;
|
||||||
private final String logPrefix;
|
|
||||||
private final HashMap<TaskId, FileChannel> channels = new HashMap<>();
|
private final HashMap<TaskId, FileChannel> channels = new HashMap<>();
|
||||||
private final HashMap<TaskId, FileLock> locks = new HashMap<>();
|
private final HashMap<TaskId, LockAndOwner> locks = new HashMap<>();
|
||||||
private final Time time;
|
private final Time time;
|
||||||
|
|
||||||
private FileChannel globalStateChannel;
|
private FileChannel globalStateChannel;
|
||||||
private FileLock globalStateLock;
|
private FileLock globalStateLock;
|
||||||
|
|
||||||
public StateDirectory(final String applicationId, final String stateDirConfig, final Time time) {
|
private static class LockAndOwner {
|
||||||
this(applicationId, "", stateDirConfig, time);
|
final FileLock lock;
|
||||||
|
final String owningThread;
|
||||||
|
|
||||||
|
LockAndOwner(final String owningThread, final FileLock lock) {
|
||||||
|
this.owningThread = owningThread;
|
||||||
|
this.lock = lock;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public StateDirectory(final String applicationId, final String threadId, final String stateDirConfig, final Time time) {
|
public StateDirectory(final String applicationId, final String stateDirConfig, final Time time) {
|
||||||
this.time = time;
|
this.time = time;
|
||||||
this.logPrefix = String.format("stream-thread [%s]", threadId);
|
|
||||||
final File baseDir = new File(stateDirConfig);
|
final File baseDir = new File(stateDirConfig);
|
||||||
if (!baseDir.exists() && !baseDir.mkdirs()) {
|
if (!baseDir.exists() && !baseDir.mkdirs()) {
|
||||||
throw new ProcessorStateException(String.format("state directory [%s] doesn't exist and couldn't be created",
|
throw new ProcessorStateException(String.format("state directory [%s] doesn't exist and couldn't be created",
|
||||||
|
@ -95,6 +99,9 @@ public class StateDirectory {
|
||||||
return dir;
|
return dir;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String logPrefix() {
|
||||||
|
return String.format("stream-thread [%s]", Thread.currentThread().getName());
|
||||||
|
}
|
||||||
/**
|
/**
|
||||||
* Get the lock for the {@link TaskId}s directory if it is available
|
* Get the lock for the {@link TaskId}s directory if it is available
|
||||||
* @param taskId
|
* @param taskId
|
||||||
|
@ -102,13 +109,19 @@ public class StateDirectory {
|
||||||
* @return true if successful
|
* @return true if successful
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
boolean lock(final TaskId taskId, int retry) throws IOException {
|
synchronized boolean lock(final TaskId taskId, int retry) throws IOException {
|
||||||
|
|
||||||
final File lockFile;
|
final File lockFile;
|
||||||
// we already have the lock so bail out here
|
// we already have the lock so bail out here
|
||||||
if (locks.containsKey(taskId)) {
|
final LockAndOwner lockAndOwner = locks.get(taskId);
|
||||||
log.trace("{} Found cached state dir lock for task {}", logPrefix, taskId);
|
if (lockAndOwner != null && lockAndOwner.owningThread.equals(Thread.currentThread().getName())) {
|
||||||
|
log.trace("{} Found cached state dir lock for task {}", logPrefix(), taskId);
|
||||||
return true;
|
return true;
|
||||||
|
} else if (lockAndOwner != null) {
|
||||||
|
// another thread owns the lock
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
lockFile = new File(directoryForTask(taskId), LOCK_FILE_NAME);
|
lockFile = new File(directoryForTask(taskId), LOCK_FILE_NAME);
|
||||||
} catch (ProcessorStateException e) {
|
} catch (ProcessorStateException e) {
|
||||||
|
@ -130,16 +143,16 @@ public class StateDirectory {
|
||||||
|
|
||||||
final FileLock lock = tryLock(retry, channel);
|
final FileLock lock = tryLock(retry, channel);
|
||||||
if (lock != null) {
|
if (lock != null) {
|
||||||
locks.put(taskId, lock);
|
locks.put(taskId, new LockAndOwner(Thread.currentThread().getName(), lock));
|
||||||
|
|
||||||
log.debug("{} Acquired state dir lock for task {}", logPrefix, taskId);
|
log.debug("{} Acquired state dir lock for task {}", logPrefix(), taskId);
|
||||||
}
|
}
|
||||||
return lock != null;
|
return lock != null;
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean lockGlobalState(final int retry) throws IOException {
|
synchronized boolean lockGlobalState(final int retry) throws IOException {
|
||||||
if (globalStateLock != null) {
|
if (globalStateLock != null) {
|
||||||
log.trace("{} Found cached state dir lock for the global task", logPrefix);
|
log.trace("{} Found cached state dir lock for the global task", logPrefix());
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -161,12 +174,12 @@ public class StateDirectory {
|
||||||
globalStateChannel = channel;
|
globalStateChannel = channel;
|
||||||
globalStateLock = fileLock;
|
globalStateLock = fileLock;
|
||||||
|
|
||||||
log.debug("{} Acquired global state dir lock", logPrefix);
|
log.debug("{} Acquired global state dir lock", logPrefix());
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void unlockGlobalState() throws IOException {
|
synchronized void unlockGlobalState() throws IOException {
|
||||||
if (globalStateLock == null) {
|
if (globalStateLock == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -175,7 +188,7 @@ public class StateDirectory {
|
||||||
globalStateLock = null;
|
globalStateLock = null;
|
||||||
globalStateChannel = null;
|
globalStateChannel = null;
|
||||||
|
|
||||||
log.debug("{} Released global state dir lock", logPrefix);
|
log.debug("{} Released global state dir lock", logPrefix());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -183,12 +196,12 @@ public class StateDirectory {
|
||||||
* @param taskId
|
* @param taskId
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
void unlock(final TaskId taskId) throws IOException {
|
synchronized void unlock(final TaskId taskId) throws IOException {
|
||||||
final FileLock lock = locks.remove(taskId);
|
final LockAndOwner lockAndOwner = locks.get(taskId);
|
||||||
if (lock != null) {
|
if (lockAndOwner != null && lockAndOwner.owningThread.equals(Thread.currentThread().getName())) {
|
||||||
lock.release();
|
locks.remove(taskId);
|
||||||
|
lockAndOwner.lock.release();
|
||||||
log.debug("{} Released state dir lock for task {}", logPrefix, taskId);
|
log.debug("{} Released state dir lock for task {}", logPrefix(), taskId);
|
||||||
|
|
||||||
final FileChannel fileChannel = channels.remove(taskId);
|
final FileChannel fileChannel = channels.remove(taskId);
|
||||||
if (fileChannel != null) {
|
if (fileChannel != null) {
|
||||||
|
@ -204,12 +217,11 @@ public class StateDirectory {
|
||||||
* @param cleanupDelayMs only remove directories if they haven't been modified for at least
|
* @param cleanupDelayMs only remove directories if they haven't been modified for at least
|
||||||
* this amount of time (milliseconds)
|
* this amount of time (milliseconds)
|
||||||
*/
|
*/
|
||||||
public void cleanRemovedTasks(final long cleanupDelayMs) {
|
public synchronized void cleanRemovedTasks(final long cleanupDelayMs) {
|
||||||
final File[] taskDirs = listTaskDirectories();
|
final File[] taskDirs = listTaskDirectories();
|
||||||
if (taskDirs == null || taskDirs.length == 0) {
|
if (taskDirs == null || taskDirs.length == 0) {
|
||||||
return; // nothing to do
|
return; // nothing to do
|
||||||
}
|
}
|
||||||
|
|
||||||
for (File taskDir : taskDirs) {
|
for (File taskDir : taskDirs) {
|
||||||
final String dirName = taskDir.getName();
|
final String dirName = taskDir.getName();
|
||||||
TaskId id = TaskId.parse(dirName);
|
TaskId id = TaskId.parse(dirName);
|
||||||
|
@ -219,19 +231,19 @@ public class StateDirectory {
|
||||||
long now = time.milliseconds();
|
long now = time.milliseconds();
|
||||||
long lastModifiedMs = taskDir.lastModified();
|
long lastModifiedMs = taskDir.lastModified();
|
||||||
if (now > lastModifiedMs + cleanupDelayMs) {
|
if (now > lastModifiedMs + cleanupDelayMs) {
|
||||||
log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms)", logPrefix, dirName, id, now - lastModifiedMs, cleanupDelayMs);
|
log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms)", logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs);
|
||||||
Utils.delete(taskDir);
|
Utils.delete(taskDir);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (OverlappingFileLockException e) {
|
} catch (OverlappingFileLockException e) {
|
||||||
// locked by another thread
|
// locked by another thread
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
log.error("{} Failed to lock the state directory due to an unexpected exception", logPrefix, e);
|
log.error("{} Failed to lock the state directory due to an unexpected exception", logPrefix(), e);
|
||||||
} finally {
|
} finally {
|
||||||
try {
|
try {
|
||||||
unlock(id);
|
unlock(id);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
log.error("{} Failed to release the state directory lock", logPrefix);
|
log.error("{} Failed to release the state directory lock", logPrefix());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -449,7 +449,8 @@ public class StreamThread extends Thread {
|
||||||
final Metrics metrics,
|
final Metrics metrics,
|
||||||
final Time time,
|
final Time time,
|
||||||
final StreamsMetadataState streamsMetadataState,
|
final StreamsMetadataState streamsMetadataState,
|
||||||
final long cacheSizeBytes) {
|
final long cacheSizeBytes,
|
||||||
|
final StateDirectory stateDirectory) {
|
||||||
super(clientId + "-StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement());
|
super(clientId + "-StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement());
|
||||||
this.applicationId = applicationId;
|
this.applicationId = applicationId;
|
||||||
this.config = config;
|
this.config = config;
|
||||||
|
@ -499,7 +500,7 @@ public class StreamThread extends Thread {
|
||||||
// standby KTables
|
// standby KTables
|
||||||
standbyRecords = new HashMap<>();
|
standbyRecords = new HashMap<>();
|
||||||
|
|
||||||
stateDirectory = new StateDirectory(applicationId, threadClientId, config.getString(StreamsConfig.STATE_DIR_CONFIG), time);
|
this.stateDirectory = stateDirectory;
|
||||||
final Object maxPollInterval = consumerConfigs.get(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG);
|
final Object maxPollInterval = consumerConfigs.get(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG);
|
||||||
rebalanceTimeoutMs = (Integer) ConfigDef.parseType(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval, Type.INT);
|
rebalanceTimeoutMs = (Integer) ConfigDef.parseType(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval, Type.INT);
|
||||||
pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG);
|
pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG);
|
||||||
|
@ -568,7 +569,6 @@ public class StreamThread extends Thread {
|
||||||
maybePunctuateSystemTime();
|
maybePunctuateSystemTime();
|
||||||
maybeCommit(timerStartedMs);
|
maybeCommit(timerStartedMs);
|
||||||
maybeUpdateStandbyTasks(timerStartedMs);
|
maybeUpdateStandbyTasks(timerStartedMs);
|
||||||
maybeClean(timerStartedMs);
|
|
||||||
}
|
}
|
||||||
log.info("{} Shutting down at user request", logPrefix);
|
log.info("{} Shutting down at user request", logPrefix);
|
||||||
}
|
}
|
||||||
|
@ -911,16 +911,6 @@ public class StreamThread extends Thread {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Cleanup any states of the tasks that have been removed from this thread
|
|
||||||
*/
|
|
||||||
protected void maybeClean(final long now) {
|
|
||||||
if (now > lastCleanMs + cleanTimeMs) {
|
|
||||||
stateDirectory.cleanRemovedTasks(cleanTimeMs);
|
|
||||||
lastCleanMs = now;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Compute the latency based on the current marked timestamp, and update the marked timestamp
|
* Compute the latency based on the current marked timestamp, and update the marked timestamp
|
||||||
* with the current system timestamp.
|
* with the current system timestamp.
|
||||||
|
|
|
@ -40,6 +40,7 @@ import org.junit.ClassRule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -449,6 +450,58 @@ public class KafkaStreamsTest {
|
||||||
Assert.assertNotNull("streamString contains non-null appId", appId);
|
Assert.assertNotNull("streamString contains non-null appId", appId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldCleanupOldStateDirs() throws InterruptedException {
|
||||||
|
final Properties props = new Properties();
|
||||||
|
final String appId = "cleanupOldStateDirs";
|
||||||
|
final String stateDir = TestUtils.tempDirectory().getPath();
|
||||||
|
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, appId);
|
||||||
|
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
|
||||||
|
props.setProperty(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, "1");
|
||||||
|
props.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
|
||||||
|
|
||||||
|
|
||||||
|
final String topic = "topic";
|
||||||
|
CLUSTER.createTopic(topic);
|
||||||
|
final KStreamBuilder builder = new KStreamBuilder();
|
||||||
|
|
||||||
|
builder.stream(Serdes.String(), Serdes.String(), topic);
|
||||||
|
|
||||||
|
final KafkaStreams streams = new KafkaStreams(builder, props);
|
||||||
|
final CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
streams.setStateListener(new KafkaStreams.StateListener() {
|
||||||
|
@Override
|
||||||
|
public void onChange(final KafkaStreams.State newState, final KafkaStreams.State oldState) {
|
||||||
|
if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
final String appDir = stateDir + File.separator + appId;
|
||||||
|
final File oldTaskDir = new File(appDir, "10_1");
|
||||||
|
assertTrue(oldTaskDir.mkdirs());
|
||||||
|
try {
|
||||||
|
streams.start();
|
||||||
|
latch.await(30, TimeUnit.SECONDS);
|
||||||
|
verifyCleanupStateDir(appDir, oldTaskDir);
|
||||||
|
assertTrue(oldTaskDir.mkdirs());
|
||||||
|
verifyCleanupStateDir(appDir, oldTaskDir);
|
||||||
|
} finally {
|
||||||
|
streams.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyCleanupStateDir(final String appDir, final File oldTaskDir) throws InterruptedException {
|
||||||
|
final File taskDir = new File(appDir, "0_0");
|
||||||
|
TestUtils.waitForCondition(new TestCondition() {
|
||||||
|
@Override
|
||||||
|
public boolean conditionMet() {
|
||||||
|
return !oldTaskDir.exists() && taskDir.exists();
|
||||||
|
}
|
||||||
|
}, 30000, "cleanup has not successfully run");
|
||||||
|
assertTrue(taskDir.exists());
|
||||||
|
}
|
||||||
|
|
||||||
public static class StateListenerStub implements KafkaStreams.StateListener {
|
public static class StateListenerStub implements KafkaStreams.StateListener {
|
||||||
int numChanges = 0;
|
int numChanges = 0;
|
||||||
KafkaStreams.State oldState;
|
KafkaStreams.State oldState;
|
||||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
|
||||||
import org.apache.kafka.streams.processor.TaskId;
|
import org.apache.kafka.streams.processor.TaskId;
|
||||||
import org.apache.kafka.streams.processor.TopologyBuilder;
|
import org.apache.kafka.streams.processor.TopologyBuilder;
|
||||||
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
|
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
|
||||||
|
import org.apache.kafka.streams.processor.internals.StateDirectory;
|
||||||
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
|
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
|
||||||
import org.apache.kafka.streams.processor.internals.StreamTask;
|
import org.apache.kafka.streams.processor.internals.StreamTask;
|
||||||
import org.apache.kafka.streams.processor.internals.StreamThread;
|
import org.apache.kafka.streams.processor.internals.StreamThread;
|
||||||
|
@ -465,7 +466,7 @@ public class RegexSourceIntegrationTest {
|
||||||
|
|
||||||
public TestStreamThread(final InternalTopologyBuilder builder, final StreamsConfig config, final KafkaClientSupplier clientSupplier, final String applicationId, final String clientId, final UUID processId, final Metrics metrics, final Time time) {
|
public TestStreamThread(final InternalTopologyBuilder builder, final StreamsConfig config, final KafkaClientSupplier clientSupplier, final String applicationId, final String clientId, final UUID processId, final Metrics metrics, final Time time) {
|
||||||
super(builder, config, clientSupplier, applicationId, clientId, processId, metrics, time, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
|
super(builder, config, clientSupplier, applicationId, clientId, processId, metrics, time, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
|
||||||
0);
|
0, new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG), time));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -32,9 +32,13 @@ import java.nio.channels.OverlappingFileLockException;
|
||||||
import java.nio.file.StandardOpenOption;
|
import java.nio.file.StandardOpenOption;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
@ -266,4 +270,57 @@ public class StateDirectoryTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldNotLockStateDirLockedByAnotherThread() throws IOException, InterruptedException {
|
||||||
|
final TaskId taskId = new TaskId(0, 0);
|
||||||
|
final AtomicReference<IOException> exceptionOnThread = new AtomicReference<>();
|
||||||
|
final Thread thread = new Thread(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
directory.lock(taskId, 1);
|
||||||
|
} catch (final IOException e) {
|
||||||
|
exceptionOnThread.set(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
thread.start();
|
||||||
|
thread.join(30000);
|
||||||
|
assertNull("should not have had an exception during locking on other thread", exceptionOnThread.get());
|
||||||
|
assertFalse(directory.lock(taskId, 1));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldNotUnLockStateDirLockedByAnotherThread() throws IOException, InterruptedException {
|
||||||
|
final TaskId taskId = new TaskId(0, 0);
|
||||||
|
final CountDownLatch lockLatch = new CountDownLatch(1);
|
||||||
|
final CountDownLatch unlockLatch = new CountDownLatch(1);
|
||||||
|
final AtomicReference<Exception> exceptionOnThread = new AtomicReference<>();
|
||||||
|
final Thread thread = new Thread(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
directory.lock(taskId, 1);
|
||||||
|
lockLatch.countDown();
|
||||||
|
unlockLatch.await();
|
||||||
|
directory.unlock(taskId);
|
||||||
|
} catch (final Exception e) {
|
||||||
|
exceptionOnThread.set(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
thread.start();
|
||||||
|
lockLatch.await(5, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
assertNull("should not have had an exception on other thread", exceptionOnThread.get());
|
||||||
|
directory.unlock(taskId);
|
||||||
|
assertFalse(directory.lock(taskId, 1));
|
||||||
|
|
||||||
|
unlockLatch.countDown();
|
||||||
|
thread.join(30000);
|
||||||
|
|
||||||
|
assertNull("should not have had an exception on other thread", exceptionOnThread.get());
|
||||||
|
assertTrue(directory.lock(taskId, 1));
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
|
@ -44,6 +44,7 @@ import org.apache.kafka.test.MockInternalTopicManager;
|
||||||
import org.apache.kafka.test.MockProcessorSupplier;
|
import org.apache.kafka.test.MockProcessorSupplier;
|
||||||
import org.apache.kafka.test.MockStateStoreSupplier;
|
import org.apache.kafka.test.MockStateStoreSupplier;
|
||||||
import org.apache.kafka.test.MockTimestampExtractor;
|
import org.apache.kafka.test.MockTimestampExtractor;
|
||||||
|
import org.apache.kafka.test.TestUtils;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -109,11 +110,12 @@ public class StreamPartitionAssignorTest {
|
||||||
private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
|
private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
|
||||||
private final InternalTopologyBuilder builder = new InternalTopologyBuilder();
|
private final InternalTopologyBuilder builder = new InternalTopologyBuilder();
|
||||||
private final StreamsConfig config = new StreamsConfig(configProps());
|
private final StreamsConfig config = new StreamsConfig(configProps());
|
||||||
|
private final StateDirectory stateDirectory = new StateDirectory("appId", TestUtils.tempDirectory().getPath(), new MockTime());
|
||||||
private final StreamThread mockStreamThread = new StreamThread(builder, config,
|
private final StreamThread mockStreamThread = new StreamThread(builder, config,
|
||||||
mockClientSupplier, "appID",
|
mockClientSupplier, "appID",
|
||||||
"clientId", UUID.randomUUID(),
|
"clientId", UUID.randomUUID(),
|
||||||
new Metrics(), new MockTime(),
|
new Metrics(), new MockTime(),
|
||||||
null, 1L);
|
null, 1L, stateDirectory);
|
||||||
private final Map<String, Object> configurationMap = new HashMap<>();
|
private final Map<String, Object> configurationMap = new HashMap<>();
|
||||||
|
|
||||||
private Properties configProps() {
|
private Properties configProps() {
|
||||||
|
@ -159,7 +161,8 @@ public class StreamPartitionAssignorTest {
|
||||||
Time.SYSTEM,
|
Time.SYSTEM,
|
||||||
new StreamsMetadataState(builder,
|
new StreamsMetadataState(builder,
|
||||||
StreamsMetadataState.UNKNOWN_HOST),
|
StreamsMetadataState.UNKNOWN_HOST),
|
||||||
0) {
|
0,
|
||||||
|
stateDirectory) {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Set<TaskId> prevActiveTasks() {
|
public Set<TaskId> prevActiveTasks() {
|
||||||
|
@ -215,8 +218,8 @@ public class StreamPartitionAssignorTest {
|
||||||
new Metrics(),
|
new Metrics(),
|
||||||
Time.SYSTEM,
|
Time.SYSTEM,
|
||||||
new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
|
new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
|
||||||
0);
|
0,
|
||||||
|
stateDirectory);
|
||||||
|
|
||||||
partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
|
partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
|
||||||
partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer));
|
partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer));
|
||||||
|
@ -290,7 +293,8 @@ public class StreamPartitionAssignorTest {
|
||||||
new Metrics(),
|
new Metrics(),
|
||||||
Time.SYSTEM,
|
Time.SYSTEM,
|
||||||
new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
|
new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
|
||||||
0);
|
0,
|
||||||
|
stateDirectory);
|
||||||
|
|
||||||
partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
|
partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
|
||||||
partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer));
|
partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer));
|
||||||
|
@ -339,7 +343,8 @@ public class StreamPartitionAssignorTest {
|
||||||
new Metrics(),
|
new Metrics(),
|
||||||
Time.SYSTEM,
|
Time.SYSTEM,
|
||||||
new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
|
new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
|
||||||
0);
|
0,
|
||||||
|
stateDirectory);
|
||||||
|
|
||||||
partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
|
partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
|
||||||
|
|
||||||
|
@ -407,7 +412,8 @@ public class StreamPartitionAssignorTest {
|
||||||
new Metrics(),
|
new Metrics(),
|
||||||
Time.SYSTEM,
|
Time.SYSTEM,
|
||||||
new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
|
new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
|
||||||
0);
|
0,
|
||||||
|
stateDirectory);
|
||||||
|
|
||||||
partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
|
partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
|
||||||
partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer));
|
partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer));
|
||||||
|
@ -484,7 +490,8 @@ public class StreamPartitionAssignorTest {
|
||||||
new Metrics(),
|
new Metrics(),
|
||||||
Time.SYSTEM,
|
Time.SYSTEM,
|
||||||
new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
|
new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
|
||||||
0);
|
0,
|
||||||
|
stateDirectory);
|
||||||
|
|
||||||
partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1));
|
partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1));
|
||||||
partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer));
|
partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer));
|
||||||
|
@ -577,7 +584,8 @@ public class StreamPartitionAssignorTest {
|
||||||
new Metrics(),
|
new Metrics(),
|
||||||
Time.SYSTEM,
|
Time.SYSTEM,
|
||||||
new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
|
new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
|
||||||
0);
|
0,
|
||||||
|
stateDirectory);
|
||||||
|
|
||||||
partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
|
partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
|
||||||
partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer));
|
partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer));
|
||||||
|
@ -646,7 +654,8 @@ public class StreamPartitionAssignorTest {
|
||||||
new Metrics(),
|
new Metrics(),
|
||||||
Time.SYSTEM,
|
Time.SYSTEM,
|
||||||
new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
|
new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
|
||||||
0);
|
0,
|
||||||
|
stateDirectory);
|
||||||
|
|
||||||
partitionAssignor.configure(config.getConsumerConfigs(thread, "test", client1));
|
partitionAssignor.configure(config.getConsumerConfigs(thread, "test", client1));
|
||||||
|
|
||||||
|
@ -682,7 +691,6 @@ public class StreamPartitionAssignorTest {
|
||||||
UUID uuid1 = UUID.randomUUID();
|
UUID uuid1 = UUID.randomUUID();
|
||||||
String client1 = "client1";
|
String client1 = "client1";
|
||||||
|
|
||||||
|
|
||||||
StreamThread thread10 = new StreamThread(
|
StreamThread thread10 = new StreamThread(
|
||||||
builder,
|
builder,
|
||||||
config,
|
config,
|
||||||
|
@ -693,7 +701,8 @@ public class StreamPartitionAssignorTest {
|
||||||
new Metrics(),
|
new Metrics(),
|
||||||
Time.SYSTEM,
|
Time.SYSTEM,
|
||||||
new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
|
new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
|
||||||
0);
|
0,
|
||||||
|
stateDirectory);
|
||||||
|
|
||||||
partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1));
|
partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1));
|
||||||
MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer);
|
MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer);
|
||||||
|
@ -731,7 +740,7 @@ public class StreamPartitionAssignorTest {
|
||||||
String client1 = "client1";
|
String client1 = "client1";
|
||||||
|
|
||||||
StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
|
StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
|
||||||
0);
|
0, stateDirectory);
|
||||||
|
|
||||||
partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1));
|
partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1));
|
||||||
MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer);
|
MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer);
|
||||||
|
@ -764,7 +773,7 @@ public class StreamPartitionAssignorTest {
|
||||||
final String client1 = "client1";
|
final String client1 = "client1";
|
||||||
|
|
||||||
final StreamThread streamThread = new StreamThread(builder, config, mockClientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
|
final StreamThread streamThread = new StreamThread(builder, config, mockClientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
|
||||||
0);
|
0, stateDirectory);
|
||||||
|
|
||||||
partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client1));
|
partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client1));
|
||||||
final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("input"));
|
final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("input"));
|
||||||
|
@ -799,7 +808,8 @@ public class StreamPartitionAssignorTest {
|
||||||
new Metrics(),
|
new Metrics(),
|
||||||
Time.SYSTEM,
|
Time.SYSTEM,
|
||||||
new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
|
new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
|
||||||
0);
|
0,
|
||||||
|
stateDirectory);
|
||||||
|
|
||||||
final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
|
final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
|
||||||
partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client1));
|
partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client1));
|
||||||
|
@ -840,7 +850,8 @@ public class StreamPartitionAssignorTest {
|
||||||
new Metrics(),
|
new Metrics(),
|
||||||
Time.SYSTEM,
|
Time.SYSTEM,
|
||||||
new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
|
new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
|
||||||
0);
|
0,
|
||||||
|
stateDirectory);
|
||||||
|
|
||||||
partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamThread.config, mockClientSupplier.restoreConsumer));
|
partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamThread.config, mockClientSupplier.restoreConsumer));
|
||||||
|
|
||||||
|
@ -863,7 +874,6 @@ public class StreamPartitionAssignorTest {
|
||||||
final String applicationId = "application-id";
|
final String applicationId = "application-id";
|
||||||
builder.setApplicationId(applicationId);
|
builder.setApplicationId(applicationId);
|
||||||
|
|
||||||
|
|
||||||
final StreamThread streamThread = new StreamThread(
|
final StreamThread streamThread = new StreamThread(
|
||||||
builder,
|
builder,
|
||||||
config,
|
config,
|
||||||
|
@ -874,7 +884,8 @@ public class StreamPartitionAssignorTest {
|
||||||
new Metrics(),
|
new Metrics(),
|
||||||
Time.SYSTEM,
|
Time.SYSTEM,
|
||||||
new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
|
new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
|
||||||
0);
|
0,
|
||||||
|
stateDirectory);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client1));
|
partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client1));
|
||||||
|
@ -996,7 +1007,8 @@ public class StreamPartitionAssignorTest {
|
||||||
new Metrics(),
|
new Metrics(),
|
||||||
Time.SYSTEM,
|
Time.SYSTEM,
|
||||||
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
||||||
0);
|
0,
|
||||||
|
stateDirectory);
|
||||||
|
|
||||||
partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client));
|
partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client));
|
||||||
final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
|
final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
|
||||||
|
@ -1087,7 +1099,8 @@ public class StreamPartitionAssignorTest {
|
||||||
new Metrics(),
|
new Metrics(),
|
||||||
Time.SYSTEM,
|
Time.SYSTEM,
|
||||||
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
||||||
0);
|
0,
|
||||||
|
stateDirectory);
|
||||||
|
|
||||||
partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client));
|
partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client));
|
||||||
partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(
|
partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(
|
||||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.kafka.streams.StreamsConfig;
|
||||||
import org.apache.kafka.streams.StreamsMetrics;
|
import org.apache.kafka.streams.StreamsMetrics;
|
||||||
import org.apache.kafka.streams.kstream.KStreamBuilder;
|
import org.apache.kafka.streams.kstream.KStreamBuilder;
|
||||||
import org.apache.kafka.streams.processor.TaskId;
|
import org.apache.kafka.streams.processor.TaskId;
|
||||||
|
import org.apache.kafka.streams.processor.TopologyBuilder;
|
||||||
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
|
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
|
||||||
import org.apache.kafka.streams.state.HostInfo;
|
import org.apache.kafka.streams.state.HostInfo;
|
||||||
import org.apache.kafka.streams.state.Stores;
|
import org.apache.kafka.streams.state.Stores;
|
||||||
|
@ -46,6 +47,7 @@ import org.apache.kafka.test.MockStateStoreSupplier;
|
||||||
import org.apache.kafka.test.MockTimestampExtractor;
|
import org.apache.kafka.test.MockTimestampExtractor;
|
||||||
import org.apache.kafka.test.TestCondition;
|
import org.apache.kafka.test.TestCondition;
|
||||||
import org.apache.kafka.test.TestUtils;
|
import org.apache.kafka.test.TestUtils;
|
||||||
|
import org.easymock.EasyMock;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -91,7 +93,8 @@ public class StreamThreadTest {
|
||||||
private UUID processId = UUID.randomUUID();
|
private UUID processId = UUID.randomUUID();
|
||||||
final KStreamBuilder builder = new KStreamBuilder();
|
final KStreamBuilder builder = new KStreamBuilder();
|
||||||
private final StreamsConfig config = new StreamsConfig(configProps(false));
|
private final StreamsConfig config = new StreamsConfig(configProps(false));
|
||||||
|
private final String stateDir = TestUtils.tempDirectory().getPath();
|
||||||
|
private final StateDirectory stateDirectory = new StateDirectory("applicationId", stateDir, mockTime);
|
||||||
|
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
|
@ -450,8 +453,10 @@ public class StreamThreadTest {
|
||||||
processId,
|
processId,
|
||||||
metrics,
|
metrics,
|
||||||
Time.SYSTEM,
|
Time.SYSTEM,
|
||||||
|
|
||||||
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
||||||
0);
|
0,
|
||||||
|
stateDirectory);
|
||||||
|
|
||||||
final StateListenerStub stateListener = new StateListenerStub();
|
final StateListenerStub stateListener = new StateListenerStub();
|
||||||
thread.setStateListener(stateListener);
|
thread.setStateListener(stateListener);
|
||||||
|
@ -503,7 +508,8 @@ public class StreamThreadTest {
|
||||||
metrics,
|
metrics,
|
||||||
Time.SYSTEM,
|
Time.SYSTEM,
|
||||||
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
||||||
0);
|
0,
|
||||||
|
stateDirectory);
|
||||||
final StreamThread thread2 = new StreamThread(
|
final StreamThread thread2 = new StreamThread(
|
||||||
builder.internalTopologyBuilder,
|
builder.internalTopologyBuilder,
|
||||||
config,
|
config,
|
||||||
|
@ -514,7 +520,8 @@ public class StreamThreadTest {
|
||||||
metrics,
|
metrics,
|
||||||
Time.SYSTEM,
|
Time.SYSTEM,
|
||||||
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
||||||
0);
|
0,
|
||||||
|
stateDirectory);
|
||||||
|
|
||||||
final Map<TaskId, Set<TopicPartition>> task0 = Collections.singletonMap(new TaskId(0, 0), task0Assignment);
|
final Map<TaskId, Set<TopicPartition>> task0 = Collections.singletonMap(new TaskId(0, 0), task0Assignment);
|
||||||
final Map<TaskId, Set<TopicPartition>> task1 = Collections.singletonMap(new TaskId(0, 1), task1Assignment);
|
final Map<TaskId, Set<TopicPartition>> task1 = Collections.singletonMap(new TaskId(0, 1), task1Assignment);
|
||||||
|
@ -617,16 +624,16 @@ public class StreamThreadTest {
|
||||||
@Test
|
@Test
|
||||||
public void testMetrics() {
|
public void testMetrics() {
|
||||||
final StreamThread thread = new StreamThread(
|
final StreamThread thread = new StreamThread(
|
||||||
builder.internalTopologyBuilder,
|
builder.internalTopologyBuilder,
|
||||||
config,
|
config,
|
||||||
clientSupplier,
|
clientSupplier,
|
||||||
applicationId,
|
applicationId,
|
||||||
clientId,
|
clientId,
|
||||||
processId,
|
processId,
|
||||||
metrics,
|
metrics,
|
||||||
mockTime,
|
mockTime,
|
||||||
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
||||||
0);
|
0, stateDirectory);
|
||||||
final String defaultGroupName = "stream-metrics";
|
final String defaultGroupName = "stream-metrics";
|
||||||
final String defaultPrefix = "thread." + thread.threadClientId();
|
final String defaultPrefix = "thread." + thread.threadClientId();
|
||||||
final Map<String, String> defaultTags = Collections.singletonMap("client-id", thread.threadClientId());
|
final Map<String, String> defaultTags = Collections.singletonMap("client-id", thread.threadClientId());
|
||||||
|
@ -657,150 +664,6 @@ public class StreamThreadTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testMaybeClean() throws IOException, InterruptedException, java.nio.file.NoSuchFileException {
|
|
||||||
final File baseDir = Files.createTempDirectory("test").toFile();
|
|
||||||
try {
|
|
||||||
final long cleanupDelay = 1000L;
|
|
||||||
final Properties props = configProps(false);
|
|
||||||
props.setProperty(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, Long.toString(cleanupDelay));
|
|
||||||
props.setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
|
|
||||||
final StreamsConfig config = new StreamsConfig(props);
|
|
||||||
final File applicationDir = new File(baseDir, applicationId);
|
|
||||||
applicationDir.mkdir();
|
|
||||||
final File stateDir1 = new File(applicationDir, task1.toString());
|
|
||||||
final File stateDir2 = new File(applicationDir, task2.toString());
|
|
||||||
final File stateDir3 = new File(applicationDir, task3.toString());
|
|
||||||
final File extraDir = new File(applicationDir, applicationId);
|
|
||||||
stateDir1.mkdir();
|
|
||||||
stateDir2.mkdir();
|
|
||||||
stateDir3.mkdir();
|
|
||||||
extraDir.mkdir();
|
|
||||||
builder.addSource("source1", "topic1");
|
|
||||||
final StreamThread thread = new StreamThread(
|
|
||||||
builder.internalTopologyBuilder,
|
|
||||||
config,
|
|
||||||
clientSupplier,
|
|
||||||
applicationId,
|
|
||||||
clientId,
|
|
||||||
processId,
|
|
||||||
metrics,
|
|
||||||
mockTime,
|
|
||||||
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
|
||||||
0) {
|
|
||||||
@Override
|
|
||||||
public void maybeClean(final long now) {
|
|
||||||
super.maybeClean(now);
|
|
||||||
}
|
|
||||||
@Override
|
|
||||||
protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitionsForTask) {
|
|
||||||
final ProcessorTopology topology = builder.build(id.topicGroupId);
|
|
||||||
return new TestStreamTask(
|
|
||||||
id,
|
|
||||||
applicationId,
|
|
||||||
partitionsForTask,
|
|
||||||
topology,
|
|
||||||
consumer,
|
|
||||||
clientSupplier.getProducer(new HashMap<String, Object>()),
|
|
||||||
restoreConsumer,
|
|
||||||
config,
|
|
||||||
new MockStreamsMetrics(new Metrics()),
|
|
||||||
stateDirectory);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
initPartitionGrouper(config, thread, clientSupplier);
|
|
||||||
assertTrue(thread.tasks().isEmpty());
|
|
||||||
mockTime.sleep(cleanupDelay);
|
|
||||||
|
|
||||||
// all directories exist since an assignment didn't happen
|
|
||||||
assertTrue(stateDir1.exists());
|
|
||||||
assertTrue(stateDir2.exists());
|
|
||||||
assertTrue(stateDir3.exists());
|
|
||||||
assertTrue(extraDir.exists());
|
|
||||||
|
|
||||||
List<TopicPartition> revokedPartitions;
|
|
||||||
List<TopicPartition> assignedPartitions;
|
|
||||||
Map<TaskId, StreamTask> prevTasks;
|
|
||||||
|
|
||||||
// Assign t1p1 and t1p2. This should create task1 & task2
|
|
||||||
final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
|
|
||||||
activeTasks.put(task1, Collections.singleton(t1p1));
|
|
||||||
activeTasks.put(task2, Collections.singleton(t1p2));
|
|
||||||
thread.setPartitionAssignor(new MockStreamsPartitionAssignor(activeTasks));
|
|
||||||
|
|
||||||
revokedPartitions = Collections.emptyList();
|
|
||||||
assignedPartitions = Arrays.asList(t1p1, t1p2);
|
|
||||||
prevTasks = new HashMap<>(thread.tasks());
|
|
||||||
|
|
||||||
final ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener;
|
|
||||||
thread.setState(StreamThread.State.RUNNING);
|
|
||||||
rebalanceListener.onPartitionsRevoked(revokedPartitions);
|
|
||||||
rebalanceListener.onPartitionsAssigned(assignedPartitions);
|
|
||||||
|
|
||||||
// there shouldn't be any previous task
|
|
||||||
assertTrue(prevTasks.isEmpty());
|
|
||||||
|
|
||||||
// task 1 & 2 are created
|
|
||||||
assertEquals(2, thread.tasks().size());
|
|
||||||
|
|
||||||
// all directories should still exit before the cleanup delay time
|
|
||||||
mockTime.sleep(cleanupDelay - 10L);
|
|
||||||
thread.maybeClean(mockTime.milliseconds());
|
|
||||||
assertTrue(stateDir1.exists());
|
|
||||||
assertTrue(stateDir2.exists());
|
|
||||||
assertTrue(stateDir3.exists());
|
|
||||||
assertTrue(extraDir.exists());
|
|
||||||
|
|
||||||
// all state directories except for task task2 & task3 will be removed. the extra directory should still exists
|
|
||||||
mockTime.sleep(11L);
|
|
||||||
thread.maybeClean(mockTime.milliseconds());
|
|
||||||
assertTrue(stateDir1.exists());
|
|
||||||
assertTrue(stateDir2.exists());
|
|
||||||
assertFalse(stateDir3.exists());
|
|
||||||
assertTrue(extraDir.exists());
|
|
||||||
|
|
||||||
// Revoke t1p1 and t1p2. This should remove task1 & task2
|
|
||||||
activeTasks.clear();
|
|
||||||
revokedPartitions = assignedPartitions;
|
|
||||||
assignedPartitions = Collections.emptyList();
|
|
||||||
prevTasks = new HashMap<>(thread.tasks());
|
|
||||||
|
|
||||||
rebalanceListener.onPartitionsRevoked(revokedPartitions);
|
|
||||||
rebalanceListener.onPartitionsAssigned(assignedPartitions);
|
|
||||||
|
|
||||||
// previous tasks should be committed
|
|
||||||
assertEquals(2, prevTasks.size());
|
|
||||||
for (final StreamTask task : prevTasks.values()) {
|
|
||||||
assertTrue(((TestStreamTask) task).committed);
|
|
||||||
((TestStreamTask) task).committed = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// no task
|
|
||||||
assertTrue(thread.tasks().isEmpty());
|
|
||||||
|
|
||||||
// all state directories for task task1 & task2 still exist before the cleanup delay time
|
|
||||||
mockTime.sleep(cleanupDelay - 10L);
|
|
||||||
thread.maybeClean(mockTime.milliseconds());
|
|
||||||
assertTrue(stateDir1.exists());
|
|
||||||
assertTrue(stateDir2.exists());
|
|
||||||
assertFalse(stateDir3.exists());
|
|
||||||
assertTrue(extraDir.exists());
|
|
||||||
|
|
||||||
// all state directories for task task1 & task2 are removed
|
|
||||||
mockTime.sleep(11L);
|
|
||||||
thread.maybeClean(mockTime.milliseconds());
|
|
||||||
assertFalse(stateDir1.exists());
|
|
||||||
assertFalse(stateDir2.exists());
|
|
||||||
assertFalse(stateDir3.exists());
|
|
||||||
assertTrue(extraDir.exists());
|
|
||||||
} finally {
|
|
||||||
// note that this call can throw a java.nio.file.NoSuchFileException
|
|
||||||
// since some of the subdirs might be deleted already during cleanup
|
|
||||||
Utils.delete(baseDir);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMaybeCommit() throws IOException, InterruptedException {
|
public void testMaybeCommit() throws IOException, InterruptedException {
|
||||||
final File baseDir = Files.createTempDirectory("test").toFile();
|
final File baseDir = Files.createTempDirectory("test").toFile();
|
||||||
|
@ -815,16 +678,16 @@ public class StreamThreadTest {
|
||||||
builder.addSource("source1", "topic1");
|
builder.addSource("source1", "topic1");
|
||||||
|
|
||||||
final StreamThread thread = new StreamThread(
|
final StreamThread thread = new StreamThread(
|
||||||
builder.internalTopologyBuilder,
|
builder.internalTopologyBuilder,
|
||||||
config,
|
config,
|
||||||
clientSupplier,
|
clientSupplier,
|
||||||
applicationId,
|
applicationId,
|
||||||
clientId,
|
clientId,
|
||||||
processId,
|
processId,
|
||||||
metrics,
|
metrics,
|
||||||
mockTime,
|
mockTime,
|
||||||
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
||||||
0) {
|
0, stateDirectory) {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void maybeCommit(final long now) {
|
public void maybeCommit(final long now) {
|
||||||
|
@ -916,7 +779,8 @@ public class StreamThreadTest {
|
||||||
metrics,
|
metrics,
|
||||||
mockTime,
|
mockTime,
|
||||||
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
||||||
0);
|
0,
|
||||||
|
stateDirectory);
|
||||||
|
|
||||||
final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
|
final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
|
||||||
assignment.put(new TaskId(0, 0), Collections.singleton(new TopicPartition("someTopic", 0)));
|
assignment.put(new TaskId(0, 0), Collections.singleton(new TopicPartition("someTopic", 0)));
|
||||||
|
@ -952,7 +816,8 @@ public class StreamThreadTest {
|
||||||
metrics,
|
metrics,
|
||||||
mockTime,
|
mockTime,
|
||||||
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
||||||
0);
|
0,
|
||||||
|
stateDirectory);
|
||||||
|
|
||||||
final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
|
final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
|
||||||
assignment.put(new TaskId(0, 0), Collections.singleton(new TopicPartition("someTopic", 0)));
|
assignment.put(new TaskId(0, 0), Collections.singleton(new TopicPartition("someTopic", 0)));
|
||||||
|
@ -991,7 +856,8 @@ public class StreamThreadTest {
|
||||||
metrics,
|
metrics,
|
||||||
mockTime,
|
mockTime,
|
||||||
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
||||||
0);
|
0,
|
||||||
|
stateDirectory);
|
||||||
|
|
||||||
final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
|
final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
|
||||||
assignment.put(new TaskId(0, 0), Collections.singleton(new TopicPartition("someTopic", 0)));
|
assignment.put(new TaskId(0, 0), Collections.singleton(new TopicPartition("someTopic", 0)));
|
||||||
|
@ -1024,7 +890,8 @@ public class StreamThreadTest {
|
||||||
metrics,
|
metrics,
|
||||||
mockTime,
|
mockTime,
|
||||||
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
||||||
0);
|
0,
|
||||||
|
stateDirectory);
|
||||||
|
|
||||||
final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
|
final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
|
||||||
assignment.put(new TaskId(0, 0), Collections.singleton(new TopicPartition("someTopic", 0)));
|
assignment.put(new TaskId(0, 0), Collections.singleton(new TopicPartition("someTopic", 0)));
|
||||||
|
@ -1055,7 +922,8 @@ public class StreamThreadTest {
|
||||||
metrics,
|
metrics,
|
||||||
mockTime,
|
mockTime,
|
||||||
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
||||||
0);
|
0,
|
||||||
|
stateDirectory);
|
||||||
|
|
||||||
thread.setPartitionAssignor(new StreamPartitionAssignor() {
|
thread.setPartitionAssignor(new StreamPartitionAssignor() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -1095,7 +963,8 @@ public class StreamThreadTest {
|
||||||
metrics,
|
metrics,
|
||||||
mockTime,
|
mockTime,
|
||||||
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
||||||
0) {
|
0,
|
||||||
|
stateDirectory) {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitionsForTask) {
|
protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitionsForTask) {
|
||||||
|
@ -1146,7 +1015,8 @@ public class StreamThreadTest {
|
||||||
metrics,
|
metrics,
|
||||||
mockTime,
|
mockTime,
|
||||||
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
||||||
0);
|
0,
|
||||||
|
stateDirectory);
|
||||||
|
|
||||||
final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
|
final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
|
||||||
restoreConsumer.updatePartitions("stream-thread-test-count-one-changelog",
|
restoreConsumer.updatePartitions("stream-thread-test-count-one-changelog",
|
||||||
|
@ -1203,7 +1073,8 @@ public class StreamThreadTest {
|
||||||
metrics,
|
metrics,
|
||||||
mockTime,
|
mockTime,
|
||||||
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
||||||
0);
|
0,
|
||||||
|
stateDirectory);
|
||||||
final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
|
final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
|
||||||
restoreConsumer.updatePartitions("stream-thread-test-count-one-changelog",
|
restoreConsumer.updatePartitions("stream-thread-test-count-one-changelog",
|
||||||
Collections.singletonList(new PartitionInfo("stream-thread-test-count-one-changelog",
|
Collections.singletonList(new PartitionInfo("stream-thread-test-count-one-changelog",
|
||||||
|
@ -1275,7 +1146,8 @@ public class StreamThreadTest {
|
||||||
metrics,
|
metrics,
|
||||||
mockTime,
|
mockTime,
|
||||||
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
||||||
0) {
|
0,
|
||||||
|
stateDirectory) {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
|
protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
|
||||||
|
@ -1355,7 +1227,8 @@ public class StreamThreadTest {
|
||||||
metrics,
|
metrics,
|
||||||
mockTime,
|
mockTime,
|
||||||
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
||||||
0);
|
0,
|
||||||
|
stateDirectory);
|
||||||
|
|
||||||
final MockConsumer consumer = clientSupplier.consumer;
|
final MockConsumer consumer = clientSupplier.consumer;
|
||||||
consumer.updatePartitions(TOPIC, Collections.singletonList(new PartitionInfo(TOPIC, 0, null, null, null)));
|
consumer.updatePartitions(TOPIC, Collections.singletonList(new PartitionInfo(TOPIC, 0, null, null, null)));
|
||||||
|
@ -1451,7 +1324,8 @@ public class StreamThreadTest {
|
||||||
new Metrics(),
|
new Metrics(),
|
||||||
new MockTime(),
|
new MockTime(),
|
||||||
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
||||||
0);
|
0,
|
||||||
|
stateDirectory);
|
||||||
|
|
||||||
final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
|
final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
|
||||||
activeTasks.put(task1, task0Assignment);
|
activeTasks.put(task1, task0Assignment);
|
||||||
|
@ -1505,7 +1379,8 @@ public class StreamThreadTest {
|
||||||
metrics,
|
metrics,
|
||||||
mockTime,
|
mockTime,
|
||||||
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
||||||
0) {
|
0,
|
||||||
|
stateDirectory) {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
|
protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
|
||||||
|
@ -1559,7 +1434,8 @@ public class StreamThreadTest {
|
||||||
metrics,
|
metrics,
|
||||||
mockTime,
|
mockTime,
|
||||||
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
||||||
0) {
|
0,
|
||||||
|
stateDirectory) {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
|
protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
|
||||||
|
@ -1625,7 +1501,8 @@ public class StreamThreadTest {
|
||||||
metrics,
|
metrics,
|
||||||
mockTime,
|
mockTime,
|
||||||
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
||||||
0) {
|
0,
|
||||||
|
stateDirectory) {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
|
protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
|
||||||
|
@ -1683,7 +1560,7 @@ public class StreamThreadTest {
|
||||||
processId,
|
processId,
|
||||||
metrics,
|
metrics,
|
||||||
mockTime,
|
mockTime,
|
||||||
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST), 0) {
|
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST), 0, stateDirectory) {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
|
protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
|
||||||
|
@ -1727,7 +1604,8 @@ public class StreamThreadTest {
|
||||||
metrics,
|
metrics,
|
||||||
mockTime,
|
mockTime,
|
||||||
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
||||||
0);
|
0,
|
||||||
|
stateDirectory);
|
||||||
|
|
||||||
final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
|
final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
|
||||||
final Map<String, Object> configurationMap = new HashMap<>();
|
final Map<String, Object> configurationMap = new HashMap<>();
|
||||||
|
@ -1800,53 +1678,40 @@ public class StreamThreadTest {
|
||||||
public void shouldReleaseStateDirLockIfFailureOnTaskSuspend() throws Exception {
|
public void shouldReleaseStateDirLockIfFailureOnTaskSuspend() throws Exception {
|
||||||
final TaskId taskId = new TaskId(0, 0);
|
final TaskId taskId = new TaskId(0, 0);
|
||||||
|
|
||||||
final StreamThread thread = setupTest(taskId);
|
final StateDirectory stateDirMock = mockStateDirInteractions(taskId);
|
||||||
final StateDirectory testStateDir = new StateDirectory(
|
final StreamThread thread = setupTest(taskId, stateDirMock);
|
||||||
applicationId,
|
|
||||||
config.getString(StreamsConfig.STATE_DIR_CONFIG),
|
|
||||||
mockTime);
|
|
||||||
|
|
||||||
assertFalse(testStateDir.lock(taskId, 0));
|
|
||||||
try {
|
try {
|
||||||
thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
|
thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
|
||||||
fail("Should have thrown exception");
|
fail("Should have thrown exception");
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
assertTrue(testStateDir.lock(taskId, 0));
|
//
|
||||||
} finally {
|
} finally {
|
||||||
thread.close();
|
thread.close();
|
||||||
testStateDir.unlock(taskId);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
EasyMock.verify(stateDirMock);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldReleaseStateDirLockIfFailureOnTaskCloseForSuspendedTask() throws Exception {
|
public void shouldReleaseStateDirLockIfFailureOnTaskCloseForSuspendedTask() throws Exception {
|
||||||
final TaskId taskId = new TaskId(0, 0);
|
final TaskId taskId = new TaskId(0, 0);
|
||||||
|
|
||||||
final StreamThread thread = setupTest(taskId);
|
final StateDirectory stateDirMock = mockStateDirInteractions(taskId);
|
||||||
|
|
||||||
final StateDirectory testStateDir = new StateDirectory(
|
final StreamThread thread = setupTest(taskId, stateDirMock);
|
||||||
applicationId,
|
thread.close();
|
||||||
config.getString(StreamsConfig.STATE_DIR_CONFIG),
|
thread.join();
|
||||||
mockTime);
|
EasyMock.verify(stateDirMock);
|
||||||
|
|
||||||
assertFalse(testStateDir.lock(taskId, 0));
|
|
||||||
try {
|
|
||||||
thread.close();
|
|
||||||
thread.join();
|
|
||||||
assertTrue(testStateDir.lock(taskId, 0));
|
|
||||||
} finally {
|
|
||||||
testStateDir.unlock(taskId);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private StreamThread setupTest(final TaskId taskId) throws InterruptedException {
|
|
||||||
|
private StreamThread setupTest(final TaskId taskId, final StateDirectory stateDirectory) throws InterruptedException {
|
||||||
|
final TopologyBuilder builder = new TopologyBuilder();
|
||||||
|
builder.setApplicationId(applicationId);
|
||||||
builder.addSource("source", "topic");
|
builder.addSource("source", "topic");
|
||||||
|
|
||||||
final MockClientSupplier clientSupplier = new MockClientSupplier();
|
final MockClientSupplier clientSupplier = new MockClientSupplier();
|
||||||
final StateDirectory stateDirectory = new StateDirectory(
|
|
||||||
applicationId,
|
|
||||||
config.getString(StreamsConfig.STATE_DIR_CONFIG),
|
|
||||||
mockTime);
|
|
||||||
|
|
||||||
final TestStreamTask testStreamTask = new TestStreamTask(taskId,
|
final TestStreamTask testStreamTask = new TestStreamTask(taskId,
|
||||||
applicationId,
|
applicationId,
|
||||||
|
@ -1865,6 +1730,7 @@ public class StreamThreadTest {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
final StreamThread thread = new StreamThread(
|
final StreamThread thread = new StreamThread(
|
||||||
builder.internalTopologyBuilder,
|
builder.internalTopologyBuilder,
|
||||||
config,
|
config,
|
||||||
|
@ -1875,7 +1741,7 @@ public class StreamThreadTest {
|
||||||
new Metrics(),
|
new Metrics(),
|
||||||
new MockTime(),
|
new MockTime(),
|
||||||
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
||||||
0) {
|
0, stateDirectory) {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
|
protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
|
||||||
|
@ -1904,22 +1770,20 @@ public class StreamThreadTest {
|
||||||
public void shouldReleaseStateDirLockIfFailureOnStandbyTaskSuspend() throws Exception {
|
public void shouldReleaseStateDirLockIfFailureOnStandbyTaskSuspend() throws Exception {
|
||||||
final TaskId taskId = new TaskId(0, 0);
|
final TaskId taskId = new TaskId(0, 0);
|
||||||
|
|
||||||
final StreamThread thread = setupStandbyTest(taskId);
|
final StateDirectory stateDirMock = mockStateDirInteractions(taskId);
|
||||||
startThreadAndRebalance(thread);
|
final StreamThread thread = setupStandbyTest(taskId, stateDirMock);
|
||||||
final StateDirectory testStateDir = new StateDirectory(applicationId,
|
|
||||||
config.getString(StreamsConfig.STATE_DIR_CONFIG),
|
startThreadAndRebalance(thread);
|
||||||
mockTime);
|
|
||||||
|
|
||||||
assertFalse(testStateDir.lock(taskId, 0));
|
|
||||||
try {
|
try {
|
||||||
thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
|
thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
|
||||||
fail("Should have thrown exception");
|
fail("Should have thrown exception");
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
assertTrue(testStateDir.lock(taskId, 0));
|
// ok
|
||||||
} finally {
|
} finally {
|
||||||
thread.close();
|
thread.close();
|
||||||
testStateDir.unlock(taskId);
|
|
||||||
}
|
}
|
||||||
|
EasyMock.verify(stateDirMock);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void startThreadAndRebalance(final StreamThread thread) throws InterruptedException {
|
private void startThreadAndRebalance(final StreamThread thread) throws InterruptedException {
|
||||||
|
@ -1938,24 +1802,31 @@ public class StreamThreadTest {
|
||||||
public void shouldReleaseStateDirLockIfFailureOnStandbyTaskCloseForUnassignedSuspendedStandbyTask() throws Exception {
|
public void shouldReleaseStateDirLockIfFailureOnStandbyTaskCloseForUnassignedSuspendedStandbyTask() throws Exception {
|
||||||
final TaskId taskId = new TaskId(0, 0);
|
final TaskId taskId = new TaskId(0, 0);
|
||||||
|
|
||||||
final StreamThread thread = setupStandbyTest(taskId);
|
final StateDirectory stateDirMock = mockStateDirInteractions(taskId);
|
||||||
|
final StreamThread thread = setupStandbyTest(taskId, stateDirMock);
|
||||||
startThreadAndRebalance(thread);
|
startThreadAndRebalance(thread);
|
||||||
final StateDirectory testStateDir = new StateDirectory(applicationId,
|
|
||||||
config.getString(StreamsConfig.STATE_DIR_CONFIG),
|
|
||||||
mockTime);
|
|
||||||
|
|
||||||
assertFalse(testStateDir.lock(taskId, 0));
|
|
||||||
try {
|
try {
|
||||||
thread.close();
|
thread.close();
|
||||||
thread.join();
|
thread.join();
|
||||||
assertTrue(testStateDir.lock(taskId, 0));
|
|
||||||
} finally {
|
} finally {
|
||||||
thread.close();
|
thread.close();
|
||||||
testStateDir.unlock(taskId);
|
|
||||||
}
|
}
|
||||||
|
EasyMock.verify(stateDirMock);
|
||||||
}
|
}
|
||||||
|
|
||||||
private StreamThread setupStandbyTest(final TaskId taskId) {
|
private StateDirectory mockStateDirInteractions(final TaskId taskId) throws IOException {
|
||||||
|
final StateDirectory stateDirMock = EasyMock.createNiceMock(StateDirectory.class);
|
||||||
|
EasyMock.expect(stateDirMock.lock(EasyMock.eq(taskId), EasyMock.anyInt())).andReturn(true);
|
||||||
|
EasyMock.expect(stateDirMock.directoryForTask(taskId)).andReturn(new File(stateDir));
|
||||||
|
stateDirMock.unlock(taskId);
|
||||||
|
EasyMock.expectLastCall();
|
||||||
|
EasyMock.replay(stateDirMock);
|
||||||
|
return stateDirMock;
|
||||||
|
}
|
||||||
|
|
||||||
|
private StreamThread setupStandbyTest(final TaskId taskId, final StateDirectory stateDirectory) {
|
||||||
final String storeName = "store";
|
final String storeName = "store";
|
||||||
final String changelogTopic = applicationId + "-" + storeName + "-changelog";
|
final String changelogTopic = applicationId + "-" + storeName + "-changelog";
|
||||||
|
|
||||||
|
@ -1985,7 +1856,8 @@ public class StreamThreadTest {
|
||||||
new Metrics(),
|
new Metrics(),
|
||||||
new MockTime(),
|
new MockTime(),
|
||||||
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
||||||
0) {
|
0,
|
||||||
|
stateDirectory) {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected StandbyTask createStandbyTask(final TaskId id, final Collection<TopicPartition> partitions) {
|
protected StandbyTask createStandbyTask(final TaskId id, final Collection<TopicPartition> partitions) {
|
||||||
|
@ -2066,7 +1938,8 @@ public class StreamThreadTest {
|
||||||
metrics,
|
metrics,
|
||||||
Time.SYSTEM,
|
Time.SYSTEM,
|
||||||
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
||||||
0) {
|
0,
|
||||||
|
stateDirectory) {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitionsForTask) {
|
protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitionsForTask) {
|
||||||
|
|
|
@ -113,16 +113,16 @@ public class StreamThreadStateStoreProviderTest {
|
||||||
storesAvailable = true;
|
storesAvailable = true;
|
||||||
provider = new StreamThreadStateStoreProvider(
|
provider = new StreamThreadStateStoreProvider(
|
||||||
new StreamThread(
|
new StreamThread(
|
||||||
builder.internalTopologyBuilder,
|
builder.internalTopologyBuilder,
|
||||||
streamsConfig,
|
streamsConfig,
|
||||||
clientSupplier,
|
clientSupplier,
|
||||||
applicationId,
|
applicationId,
|
||||||
"clientId",
|
"clientId",
|
||||||
UUID.randomUUID(),
|
UUID.randomUUID(),
|
||||||
new Metrics(),
|
new Metrics(),
|
||||||
Time.SYSTEM,
|
Time.SYSTEM,
|
||||||
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
|
||||||
0) {
|
0, stateDirectory) {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Map<TaskId, StreamTask> tasks() {
|
public Map<TaskId, StreamTask> tasks() {
|
||||||
|
|
Loading…
Reference in New Issue