MINOR: set up temp directories properly in StreamTaskTest

guozhangwang
StreamTaskTest did not set up a temp directory for each test. This occasionally caused interference between tests through state directory locking.

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang

Closes #317 from ymatsuda/fix_StreamTaskTest
This commit is contained in:
Yasuhiro Matsuda 2015-10-15 11:49:44 -07:00 committed by Guozhang Wang
parent a4dbf90107
commit 50a076d1e9
1 changed files with 98 additions and 78 deletions

View File

@ -27,11 +27,14 @@ import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamingConfig;
import org.apache.kafka.test.MockSourceNode;
import org.junit.Test;
import org.junit.Before;
import java.io.File;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
@ -61,22 +64,25 @@ public class StreamTaskTest {
}
});
private final StreamingConfig config = new StreamingConfig(new Properties() {
{
setProperty(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
setProperty(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
setProperty(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
setProperty(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
setProperty(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
setProperty(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
setProperty(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
}
});
private StreamingConfig createConfig(final File baseDir) throws Exception {
return new StreamingConfig(new Properties() {
{
setProperty(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
setProperty(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
setProperty(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
setProperty(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
setProperty(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
setProperty(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
setProperty(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
setProperty(StreamingConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
}
});
}
private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
private final MockProducer<byte[], byte[]> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer);
private final MockConsumer<byte[], byte[]> restoreStateConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
private final byte[] recordValue = intSerializer.serialize(null, 10);
private final byte[] recordKey = intSerializer.serialize(null, 1);
@ -88,96 +94,110 @@ public class StreamTaskTest {
@SuppressWarnings("unchecked")
@Test
public void testProcessOrder() {
StreamTask task = new StreamTask(0, consumer, producer, restoreStateConsumer, partitions, topology, config, null);
public void testProcessOrder() throws Exception {
File baseDir = Files.createTempDirectory("test").toFile();
try {
StreamingConfig config = createConfig(baseDir);
StreamTask task = new StreamTask(0, consumer, producer, restoreStateConsumer, partitions, topology, config, null);
task.addRecords(partition1, records(
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, recordKey, recordValue),
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, recordKey, recordValue),
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, recordKey, recordValue)
));
task.addRecords(partition1, records(
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, recordKey, recordValue),
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, recordKey, recordValue),
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, recordKey, recordValue)
));
task.addRecords(partition2, records(
new ConsumerRecord<>(partition2.topic(), partition2.partition(), 25, recordKey, recordValue),
new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, recordKey, recordValue),
new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, recordKey, recordValue)
));
task.addRecords(partition2, records(
new ConsumerRecord<>(partition2.topic(), partition2.partition(), 25, recordKey, recordValue),
new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, recordKey, recordValue),
new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, recordKey, recordValue)
));
assertEquals(task.process(), 5);
assertEquals(source1.numReceived, 1);
assertEquals(source2.numReceived, 0);
assertEquals(task.process(), 5);
assertEquals(source1.numReceived, 1);
assertEquals(source2.numReceived, 0);
assertEquals(task.process(), 4);
assertEquals(source1.numReceived, 1);
assertEquals(source2.numReceived, 1);
assertEquals(task.process(), 4);
assertEquals(source1.numReceived, 1);
assertEquals(source2.numReceived, 1);
assertEquals(task.process(), 3);
assertEquals(source1.numReceived, 2);
assertEquals(source2.numReceived, 1);
assertEquals(task.process(), 3);
assertEquals(source1.numReceived, 2);
assertEquals(source2.numReceived, 1);
assertEquals(task.process(), 2);
assertEquals(source1.numReceived, 3);
assertEquals(source2.numReceived, 1);
assertEquals(task.process(), 2);
assertEquals(source1.numReceived, 3);
assertEquals(source2.numReceived, 1);
assertEquals(task.process(), 1);
assertEquals(source1.numReceived, 3);
assertEquals(source2.numReceived, 2);
assertEquals(task.process(), 1);
assertEquals(source1.numReceived, 3);
assertEquals(source2.numReceived, 2);
assertEquals(task.process(), 0);
assertEquals(source1.numReceived, 3);
assertEquals(source2.numReceived, 3);
assertEquals(task.process(), 0);
assertEquals(source1.numReceived, 3);
assertEquals(source2.numReceived, 3);
task.close();
task.close();
} finally {
Utils.delete(baseDir);
}
}
@SuppressWarnings("unchecked")
@Test
public void testPauseResume() {
StreamTask task = new StreamTask(1, consumer, producer, restoreStateConsumer, partitions, topology, config, null);
public void testPauseResume() throws Exception {
File baseDir = Files.createTempDirectory("test").toFile();
try {
StreamingConfig config = createConfig(baseDir);
StreamTask task = new StreamTask(1, consumer, producer, restoreStateConsumer, partitions, topology, config, null);
task.addRecords(partition1, records(
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, recordKey, recordValue),
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, recordKey, recordValue)
));
task.addRecords(partition1, records(
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, recordKey, recordValue),
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, recordKey, recordValue)
));
task.addRecords(partition2, records(
new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, recordKey, recordValue),
new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, recordKey, recordValue),
new ConsumerRecord<>(partition2.topic(), partition2.partition(), 55, recordKey, recordValue),
new ConsumerRecord<>(partition2.topic(), partition2.partition(), 65, recordKey, recordValue)
));
task.addRecords(partition2, records(
new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, recordKey, recordValue),
new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, recordKey, recordValue),
new ConsumerRecord<>(partition2.topic(), partition2.partition(), 55, recordKey, recordValue),
new ConsumerRecord<>(partition2.topic(), partition2.partition(), 65, recordKey, recordValue)
));
assertEquals(task.process(), 5);
assertEquals(source1.numReceived, 1);
assertEquals(source2.numReceived, 0);
assertEquals(task.process(), 5);
assertEquals(source1.numReceived, 1);
assertEquals(source2.numReceived, 0);
assertEquals(consumer.paused().size(), 1);
assertTrue(consumer.paused().contains(partition2));
assertEquals(consumer.paused().size(), 1);
assertTrue(consumer.paused().contains(partition2));
task.addRecords(partition1, records(
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, recordKey, recordValue),
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 40, recordKey, recordValue),
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 50, recordKey, recordValue)
));
task.addRecords(partition1, records(
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, recordKey, recordValue),
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 40, recordKey, recordValue),
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 50, recordKey, recordValue)
));
assertEquals(consumer.paused().size(), 2);
assertTrue(consumer.paused().contains(partition1));
assertTrue(consumer.paused().contains(partition2));
assertEquals(consumer.paused().size(), 2);
assertTrue(consumer.paused().contains(partition1));
assertTrue(consumer.paused().contains(partition2));
assertEquals(task.process(), 7);
assertEquals(source1.numReceived, 1);
assertEquals(source2.numReceived, 1);
assertEquals(task.process(), 7);
assertEquals(source1.numReceived, 1);
assertEquals(source2.numReceived, 1);
assertEquals(consumer.paused().size(), 1);
assertTrue(consumer.paused().contains(partition1));
assertEquals(consumer.paused().size(), 1);
assertTrue(consumer.paused().contains(partition1));
assertEquals(task.process(), 6);
assertEquals(source1.numReceived, 2);
assertEquals(source2.numReceived, 1);
assertEquals(task.process(), 6);
assertEquals(source1.numReceived, 2);
assertEquals(source2.numReceived, 1);
assertEquals(consumer.paused().size(), 0);
assertEquals(consumer.paused().size(), 0);
task.close();
task.close();
} finally {
Utils.delete(baseDir);
}
}
private Iterable<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[], byte[]>... recs) {