mirror of https://github.com/apache/kafka.git
KAFKA-16898 move TimeIndexTest and TransactionIndexTest to storage module (#16341)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
9ac102596b
commit
a9d71d1312
|
@ -1,147 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package kafka.log
|
||||
|
||||
import java.io.File
|
||||
import kafka.utils.TestUtils
|
||||
import org.apache.kafka.common.errors.InvalidOffsetException
|
||||
import org.apache.kafka.storage.internals.log.{CorruptIndexException, TimeIndex, TimestampOffset}
|
||||
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
|
||||
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
|
||||
|
||||
/**
|
||||
* Unit test for time index.
|
||||
*/
|
||||
class TimeIndexTest {
|
||||
var idx: TimeIndex = _
|
||||
val maxEntries = 30
|
||||
val baseOffset = 45L
|
||||
|
||||
@BeforeEach
|
||||
def setup(): Unit = {
|
||||
this.idx = new TimeIndex(nonExistentTempFile(), baseOffset, maxEntries * 12)
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
def teardown(): Unit = {
|
||||
if (this.idx != null)
|
||||
this.idx.file.delete()
|
||||
}
|
||||
|
||||
@Test
|
||||
def testLookUp(): Unit = {
|
||||
// Empty time index
|
||||
assertEquals(new TimestampOffset(-1L, baseOffset), idx.lookup(100L))
|
||||
|
||||
// Add several time index entries.
|
||||
appendEntries(maxEntries - 1)
|
||||
|
||||
// look for timestamp smaller than the earliest entry
|
||||
assertEquals(new TimestampOffset(-1L, baseOffset), idx.lookup(9))
|
||||
// look for timestamp in the middle of two entries.
|
||||
assertEquals(new TimestampOffset(20L, 65L), idx.lookup(25))
|
||||
// look for timestamp same as the one in the entry
|
||||
assertEquals(new TimestampOffset(30L, 75L), idx.lookup(30))
|
||||
}
|
||||
|
||||
@Test
|
||||
def testEntry(): Unit = {
|
||||
appendEntries(maxEntries - 1)
|
||||
assertEquals(new TimestampOffset(10L, 55L), idx.entry(0))
|
||||
assertEquals(new TimestampOffset(20L, 65L), idx.entry(1))
|
||||
assertEquals(new TimestampOffset(30L, 75L), idx.entry(2))
|
||||
assertEquals(new TimestampOffset(40L, 85L), idx.entry(3))
|
||||
}
|
||||
|
||||
@Test
|
||||
def testEntryOverflow(): Unit = {
|
||||
assertThrows(classOf[IllegalArgumentException], () => idx.entry(0))
|
||||
}
|
||||
|
||||
@Test
|
||||
def testTruncate(): Unit = {
|
||||
appendEntries(maxEntries - 1)
|
||||
idx.truncate()
|
||||
assertEquals(0, idx.entries)
|
||||
|
||||
appendEntries(maxEntries - 1)
|
||||
idx.truncateTo(10 + baseOffset)
|
||||
assertEquals(0, idx.entries)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testAppend(): Unit = {
|
||||
appendEntries(maxEntries - 1)
|
||||
assertThrows(classOf[IllegalArgumentException], () => idx.maybeAppend(10000L, 1000L))
|
||||
assertThrows(classOf[InvalidOffsetException], () => idx.maybeAppend(10000L, (maxEntries - 2) * 10, true))
|
||||
idx.maybeAppend(10000L, 1000L, true)
|
||||
}
|
||||
|
||||
private def appendEntries(numEntries: Int): Unit = {
|
||||
for (i <- 1 to numEntries)
|
||||
idx.maybeAppend(i * 10, i * 10 + baseOffset)
|
||||
}
|
||||
|
||||
def nonExistentTempFile(): File = {
|
||||
val file = TestUtils.tempFile()
|
||||
file.delete()
|
||||
file
|
||||
}
|
||||
|
||||
@Test
|
||||
def testSanityCheck(): Unit = {
|
||||
idx.sanityCheck()
|
||||
appendEntries(5)
|
||||
val firstEntry = idx.entry(0)
|
||||
idx.sanityCheck()
|
||||
idx.close()
|
||||
|
||||
var shouldCorruptOffset = false
|
||||
var shouldCorruptTimestamp = false
|
||||
var shouldCorruptLength = false
|
||||
idx = new TimeIndex(idx.file, baseOffset, maxEntries * 12) {
|
||||
override def lastEntry = {
|
||||
val superLastEntry = super.lastEntry
|
||||
val offset = if (shouldCorruptOffset) this.baseOffset - 1 else superLastEntry.offset
|
||||
val timestamp = if (shouldCorruptTimestamp) firstEntry.timestamp - 1 else superLastEntry.timestamp
|
||||
new TimestampOffset(timestamp, offset)
|
||||
}
|
||||
override def length = {
|
||||
val superLength = super.length
|
||||
if (shouldCorruptLength) superLength - 1 else superLength
|
||||
}
|
||||
}
|
||||
|
||||
shouldCorruptOffset = true
|
||||
assertThrows(classOf[CorruptIndexException], () => idx.sanityCheck())
|
||||
shouldCorruptOffset = false
|
||||
|
||||
shouldCorruptTimestamp = true
|
||||
assertThrows(classOf[CorruptIndexException], () => idx.sanityCheck())
|
||||
shouldCorruptTimestamp = false
|
||||
|
||||
shouldCorruptLength = true
|
||||
assertThrows(classOf[CorruptIndexException], () => idx.sanityCheck())
|
||||
shouldCorruptLength = false
|
||||
|
||||
idx.sanityCheck()
|
||||
idx.close()
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,182 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package kafka.log
|
||||
|
||||
import kafka.utils.TestUtils
|
||||
import org.apache.kafka.common.message.FetchResponseData
|
||||
import org.apache.kafka.storage.internals.log.{AbortedTxn, CorruptIndexException, TransactionIndex}
|
||||
import org.junit.jupiter.api.Assertions._
|
||||
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
|
||||
|
||||
import scala.jdk.CollectionConverters._
|
||||
import java.io.File
|
||||
import java.util.Collections
|
||||
|
||||
class TransactionIndexTest {
|
||||
var file: File = _
|
||||
var index: TransactionIndex = _
|
||||
val offset = 0L
|
||||
|
||||
@BeforeEach
|
||||
def setup(): Unit = {
|
||||
file = TestUtils.tempFile()
|
||||
index = new TransactionIndex(offset, file)
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
def teardown(): Unit = {
|
||||
index.close()
|
||||
}
|
||||
|
||||
@Test
|
||||
def testPositionSetCorrectlyWhenOpened(): Unit = {
|
||||
val abortedTxns = List(
|
||||
new AbortedTxn(0L, 0, 10, 11),
|
||||
new AbortedTxn(1L, 5, 15, 13),
|
||||
new AbortedTxn(2L, 18, 35, 25),
|
||||
new AbortedTxn(3L, 32, 50, 40))
|
||||
abortedTxns.foreach(index.append)
|
||||
index.close()
|
||||
|
||||
val reopenedIndex = new TransactionIndex(0L, file)
|
||||
val anotherAbortedTxn = new AbortedTxn(3L, 50, 60, 55)
|
||||
reopenedIndex.append(anotherAbortedTxn)
|
||||
assertEquals((abortedTxns ++ List(anotherAbortedTxn)).asJava, reopenedIndex.allAbortedTxns)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testSanityCheck(): Unit = {
|
||||
val abortedTxns = List(
|
||||
new AbortedTxn(0L, 0, 10, 11),
|
||||
new AbortedTxn(1L, 5, 15, 13),
|
||||
new AbortedTxn(2L, 18, 35, 25),
|
||||
new AbortedTxn(3L, 32, 50, 40))
|
||||
abortedTxns.foreach(index.append)
|
||||
index.close()
|
||||
|
||||
// open the index with a different starting offset to fake invalid data
|
||||
val reopenedIndex = new TransactionIndex(100L, file)
|
||||
assertThrows(classOf[CorruptIndexException], () => reopenedIndex.sanityCheck())
|
||||
}
|
||||
|
||||
@Test
|
||||
def testLastOffsetMustIncrease(): Unit = {
|
||||
index.append(new AbortedTxn(1L, 5, 15, 13))
|
||||
assertThrows(classOf[IllegalArgumentException], () => index.append(new AbortedTxn(0L, 0,
|
||||
15, 11)))
|
||||
}
|
||||
|
||||
@Test
|
||||
def testLastOffsetCannotDecrease(): Unit = {
|
||||
index.append(new AbortedTxn(1L, 5, 15, 13))
|
||||
assertThrows(classOf[IllegalArgumentException], () => index.append(new AbortedTxn(0L, 0,
|
||||
10, 11)))
|
||||
}
|
||||
|
||||
@Test
|
||||
def testCollectAbortedTransactions(): Unit = {
|
||||
val abortedTransactions = List(
|
||||
new AbortedTxn(0L, 0, 10, 11),
|
||||
new AbortedTxn(1L, 5, 15, 13),
|
||||
new AbortedTxn(2L, 18, 35, 25),
|
||||
new AbortedTxn(3L, 32, 50, 40))
|
||||
|
||||
abortedTransactions.foreach(index.append)
|
||||
|
||||
var result = index.collectAbortedTxns(0L, 100L)
|
||||
assertEquals(abortedTransactions.asJava, result.abortedTransactions)
|
||||
assertFalse(result.isComplete)
|
||||
|
||||
result = index.collectAbortedTxns(0L, 32)
|
||||
assertEquals(abortedTransactions.take(3).asJava, result.abortedTransactions)
|
||||
assertTrue(result.isComplete)
|
||||
|
||||
result = index.collectAbortedTxns(0L, 35)
|
||||
assertEquals(abortedTransactions.asJava, result.abortedTransactions)
|
||||
assertTrue(result.isComplete)
|
||||
|
||||
result = index.collectAbortedTxns(10, 35)
|
||||
assertEquals(abortedTransactions.asJava, result.abortedTransactions)
|
||||
assertTrue(result.isComplete)
|
||||
|
||||
result = index.collectAbortedTxns(11, 35)
|
||||
assertEquals(abortedTransactions.slice(1, 4).asJava, result.abortedTransactions)
|
||||
assertTrue(result.isComplete)
|
||||
|
||||
result = index.collectAbortedTxns(20, 41)
|
||||
assertEquals(abortedTransactions.slice(2, 4).asJava, result.abortedTransactions)
|
||||
assertFalse(result.isComplete)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testTruncate(): Unit = {
|
||||
val abortedTransactions = List(
|
||||
new AbortedTxn(0L, 0, 10, 2),
|
||||
new AbortedTxn(1L, 5, 15, 16),
|
||||
new AbortedTxn(2L, 18, 35, 25),
|
||||
new AbortedTxn(3L, 32, 50, 40))
|
||||
|
||||
abortedTransactions.foreach(index.append)
|
||||
|
||||
index.truncateTo(51)
|
||||
assertEquals(abortedTransactions.asJava, index.collectAbortedTxns(0L, 100L).abortedTransactions)
|
||||
|
||||
index.truncateTo(50)
|
||||
assertEquals(abortedTransactions.take(3).asJava, index.collectAbortedTxns(0L, 100L).abortedTransactions)
|
||||
|
||||
index.reset()
|
||||
assertEquals(Collections.emptyList[FetchResponseData.AbortedTransaction], index.collectAbortedTxns(0L, 100L).abortedTransactions)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testAbortedTxnSerde(): Unit = {
|
||||
val pid = 983493L
|
||||
val firstOffset = 137L
|
||||
val lastOffset = 299L
|
||||
val lastStableOffset = 200L
|
||||
|
||||
val abortedTxn = new AbortedTxn(pid, firstOffset, lastOffset, lastStableOffset)
|
||||
assertEquals(AbortedTxn.CURRENT_VERSION, abortedTxn.version)
|
||||
assertEquals(pid, abortedTxn.producerId)
|
||||
assertEquals(firstOffset, abortedTxn.firstOffset)
|
||||
assertEquals(lastOffset, abortedTxn.lastOffset)
|
||||
assertEquals(lastStableOffset, abortedTxn.lastStableOffset)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testRenameIndex(): Unit = {
|
||||
val renamed = TestUtils.tempFile()
|
||||
index.append(new AbortedTxn(0L, 0, 10, 2))
|
||||
|
||||
index.renameTo(renamed)
|
||||
index.append(new AbortedTxn(1L, 5, 15, 16))
|
||||
|
||||
val abortedTxns = index.collectAbortedTxns(0L, 100L).abortedTransactions
|
||||
assertEquals(2, abortedTxns.size)
|
||||
assertEquals(0, abortedTxns.get(0).firstOffset)
|
||||
assertEquals(5, abortedTxns.get(1).firstOffset)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testUpdateParentDir(): Unit = {
|
||||
val tmpParentDir = new File(TestUtils.tempDir(), "parent")
|
||||
tmpParentDir.mkdir()
|
||||
assertNotEquals(tmpParentDir, index.file.getParentFile)
|
||||
index.updateParentDir(tmpParentDir)
|
||||
assertEquals(tmpParentDir, index.file.getParentFile)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,205 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.storage.internals.log;
|
||||
|
||||
import org.apache.kafka.common.errors.InvalidOffsetException;
|
||||
import org.apache.kafka.common.record.RecordBatch;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
/**
|
||||
* Unit test for time index.
|
||||
*/
|
||||
public class TimeIndexTest {
|
||||
private final int maxEntries = 30;
|
||||
private final long baseOffset = 45L;
|
||||
private final TimeIndex idx = assertDoesNotThrow(() -> new TimeIndex(nonExistentTempFile(), baseOffset, maxEntries * 12));
|
||||
|
||||
@AfterEach
|
||||
public void teardown() {
|
||||
if (this.idx != null)
|
||||
this.idx.file().delete();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLookUp() {
|
||||
// Empty time index
|
||||
assertEquals(new TimestampOffset(-1L, baseOffset), idx.lookup(100L));
|
||||
|
||||
// Add several time index entries.
|
||||
appendEntries(maxEntries - 1);
|
||||
|
||||
// look for timestamp smaller than the earliest entry
|
||||
assertEquals(new TimestampOffset(-1L, baseOffset), idx.lookup(9));
|
||||
// look for timestamp in the middle of two entries.
|
||||
assertEquals(new TimestampOffset(20L, 65L), idx.lookup(25));
|
||||
// look for timestamp same as the one in the entry
|
||||
assertEquals(new TimestampOffset(30L, 75L), idx.lookup(30));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEntry() {
|
||||
appendEntries(maxEntries - 1);
|
||||
assertEquals(new TimestampOffset(10L, 55L), idx.entry(0));
|
||||
assertEquals(new TimestampOffset(20L, 65L), idx.entry(1));
|
||||
assertEquals(new TimestampOffset(30L, 75L), idx.entry(2));
|
||||
assertEquals(new TimestampOffset(40L, 85L), idx.entry(3));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEntryOverflow() {
|
||||
assertThrows(IllegalArgumentException.class, () -> idx.entry(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTruncate() {
|
||||
appendEntries(maxEntries - 1);
|
||||
idx.truncate();
|
||||
assertEquals(0, idx.entries());
|
||||
|
||||
appendEntries(maxEntries - 1);
|
||||
idx.truncateTo(10 + baseOffset);
|
||||
assertEquals(0, idx.entries());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAppend() {
|
||||
appendEntries(maxEntries - 1);
|
||||
assertThrows(IllegalArgumentException.class, () -> idx.maybeAppend(10000L, 1000L));
|
||||
assertThrows(InvalidOffsetException.class, () -> idx.maybeAppend(10000L, (maxEntries - 2) * 10, true));
|
||||
idx.maybeAppend(10000L, 1000L, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSanityCheck() throws IOException {
|
||||
idx.sanityCheck();
|
||||
appendEntries(5);
|
||||
TimestampOffset firstEntry = idx.entry(0);
|
||||
idx.sanityCheck();
|
||||
idx.close();
|
||||
|
||||
class MockTimeIndex extends TimeIndex {
|
||||
boolean shouldCorruptOffset = false;
|
||||
boolean shouldCorruptTimestamp = false;
|
||||
boolean shouldCorruptLength = false;
|
||||
|
||||
MockTimeIndex(File file, long baseOffset, int maxEntries) throws IOException {
|
||||
super(file, baseOffset, maxEntries);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimestampOffset lastEntry() {
|
||||
TimestampOffset superLastEntry = super.lastEntry();
|
||||
long offset = shouldCorruptOffset ? this.baseOffset() - 1 : superLastEntry.offset;
|
||||
long timestamp = shouldCorruptTimestamp ? firstEntry.timestamp - 1 : superLastEntry.timestamp;
|
||||
return new TimestampOffset(timestamp, offset);
|
||||
}
|
||||
@Override
|
||||
public long length() {
|
||||
long superLength = super.length();
|
||||
return shouldCorruptLength ? superLength - 1 : superLength;
|
||||
}
|
||||
|
||||
public void setShouldCorruptOffset(boolean shouldCorruptOffset) {
|
||||
this.shouldCorruptOffset = shouldCorruptOffset;
|
||||
}
|
||||
|
||||
public void setShouldCorruptTimestamp(boolean shouldCorruptTimestamp) {
|
||||
this.shouldCorruptTimestamp = shouldCorruptTimestamp;
|
||||
}
|
||||
|
||||
public void setShouldCorruptLength(boolean shouldCorruptLength) {
|
||||
this.shouldCorruptLength = shouldCorruptLength;
|
||||
}
|
||||
}
|
||||
try (MockTimeIndex mockIdx = new MockTimeIndex(idx.file(), baseOffset, maxEntries * 12)) {
|
||||
mockIdx.setShouldCorruptOffset(true);
|
||||
assertThrows(CorruptIndexException.class, mockIdx::sanityCheck);
|
||||
mockIdx.setShouldCorruptOffset(false);
|
||||
|
||||
mockIdx.setShouldCorruptTimestamp(true);
|
||||
assertThrows(CorruptIndexException.class, mockIdx::sanityCheck);
|
||||
mockIdx.setShouldCorruptTimestamp(false);
|
||||
|
||||
mockIdx.setShouldCorruptLength(true);
|
||||
assertThrows(CorruptIndexException.class, mockIdx::sanityCheck);
|
||||
mockIdx.setShouldCorruptLength(false);
|
||||
|
||||
mockIdx.sanityCheck();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIsFull() {
|
||||
assertFalse(idx.isFull());
|
||||
appendEntries(maxEntries - 1);
|
||||
assertTrue(idx.isFull());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLastEntry() {
|
||||
assertEquals(new TimestampOffset(RecordBatch.NO_TIMESTAMP, baseOffset), idx.lastEntry());
|
||||
idx.maybeAppend(1, 1 + baseOffset);
|
||||
assertEquals(new TimestampOffset(1, baseOffset + 1), idx.lastEntry());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testResize() throws IOException {
|
||||
boolean result = idx.resize(maxEntries * idx.entrySize());
|
||||
assertFalse(result);
|
||||
|
||||
result = idx.resize(maxEntries / 2 * idx.entrySize());
|
||||
assertTrue(result);
|
||||
|
||||
result = idx.resize(maxEntries * 2 * idx.entrySize());
|
||||
assertTrue(result);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEntrySize() {
|
||||
assertEquals(12, idx.entrySize());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseEntry() {
|
||||
idx.maybeAppend(1, 1 + baseOffset);
|
||||
assertEquals(new TimestampOffset(1, baseOffset + 1), idx.parseEntry(idx.mmap(), 0));
|
||||
}
|
||||
|
||||
private void appendEntries(Integer numEntries) {
|
||||
IntStream.rangeClosed(1, numEntries).forEach(i -> idx.maybeAppend(i * 10, i * 10 + baseOffset));
|
||||
}
|
||||
|
||||
private File nonExistentTempFile() throws IOException {
|
||||
File file = TestUtils.tempFile();
|
||||
file.delete();
|
||||
return file;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,205 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.storage.internals.log;
|
||||
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class TransactionIndexTest {
|
||||
private final File file = assertDoesNotThrow(() -> TestUtils.tempFile());
|
||||
private final TransactionIndex index = assertDoesNotThrow(() -> new TransactionIndex(0, file));
|
||||
|
||||
@AfterEach
|
||||
public void teardown() throws IOException {
|
||||
index.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPositionSetCorrectlyWhenOpened() throws IOException {
|
||||
List<AbortedTxn> abortedTxns = new ArrayList<>(Arrays.asList(
|
||||
new AbortedTxn(0L, 0, 10, 11),
|
||||
new AbortedTxn(1L, 5, 15, 13),
|
||||
new AbortedTxn(2L, 18, 35, 25),
|
||||
new AbortedTxn(3L, 32, 50, 40)));
|
||||
abortedTxns.forEach(txn -> assertDoesNotThrow(() -> index.append(txn)));
|
||||
index.close();
|
||||
|
||||
TransactionIndex reopenedIndex = new TransactionIndex(0L, file);
|
||||
AbortedTxn anotherAbortedTxn = new AbortedTxn(3L, 50, 60, 55);
|
||||
reopenedIndex.append(anotherAbortedTxn);
|
||||
abortedTxns.add(anotherAbortedTxn);
|
||||
assertEquals(abortedTxns, reopenedIndex.allAbortedTxns());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSanityCheck() throws IOException {
|
||||
List<AbortedTxn> abortedTxns = Arrays.asList(
|
||||
new AbortedTxn(0L, 0, 10, 11),
|
||||
new AbortedTxn(1L, 5, 15, 13),
|
||||
new AbortedTxn(2L, 18, 35, 25),
|
||||
new AbortedTxn(3L, 32, 50, 40));
|
||||
abortedTxns.forEach(txn -> assertDoesNotThrow(() -> index.append(txn)));
|
||||
index.close();
|
||||
|
||||
// open the index with a different starting offset to fake invalid data
|
||||
try (TransactionIndex reopenedIndex = new TransactionIndex(100L, file)) {
|
||||
assertThrows(CorruptIndexException.class, reopenedIndex::sanityCheck);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLastOffsetMustIncrease() throws IOException {
|
||||
index.append(new AbortedTxn(1L, 5, 15, 13));
|
||||
assertThrows(IllegalArgumentException.class, () -> index.append(new AbortedTxn(0L, 0,
|
||||
15, 11)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLastOffsetCannotDecrease() throws IOException {
|
||||
index.append(new AbortedTxn(1L, 5, 15, 13));
|
||||
assertThrows(IllegalArgumentException.class, () -> index.append(new AbortedTxn(0L, 0,
|
||||
10, 11)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCollectAbortedTransactions() {
|
||||
List<AbortedTxn> abortedTransactions = Arrays.asList(
|
||||
new AbortedTxn(0L, 0, 10, 11),
|
||||
new AbortedTxn(1L, 5, 15, 13),
|
||||
new AbortedTxn(2L, 18, 35, 25),
|
||||
new AbortedTxn(3L, 32, 50, 40));
|
||||
|
||||
abortedTransactions.forEach(txn -> assertDoesNotThrow(() -> index.append(txn)));
|
||||
|
||||
TxnIndexSearchResult result = index.collectAbortedTxns(0L, 100L);
|
||||
assertEquals(abortedTransactions, result.abortedTransactions);
|
||||
assertFalse(result.isComplete);
|
||||
|
||||
result = index.collectAbortedTxns(0L, 32);
|
||||
assertEquals(abortedTransactions.subList(0, 3), result.abortedTransactions);
|
||||
assertTrue(result.isComplete);
|
||||
|
||||
result = index.collectAbortedTxns(0L, 35);
|
||||
assertEquals(abortedTransactions, result.abortedTransactions);
|
||||
assertTrue(result.isComplete);
|
||||
|
||||
result = index.collectAbortedTxns(10, 35);
|
||||
assertEquals(abortedTransactions, result.abortedTransactions);
|
||||
assertTrue(result.isComplete);
|
||||
|
||||
result = index.collectAbortedTxns(11, 35);
|
||||
assertEquals(abortedTransactions.subList(1, 4), result.abortedTransactions);
|
||||
assertTrue(result.isComplete);
|
||||
|
||||
result = index.collectAbortedTxns(20, 41);
|
||||
assertEquals(abortedTransactions.subList(2, 4), result.abortedTransactions);
|
||||
assertFalse(result.isComplete);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTruncate() throws IOException {
|
||||
List<AbortedTxn> abortedTransactions = Arrays.asList(
|
||||
new AbortedTxn(0L, 0, 10, 2),
|
||||
new AbortedTxn(1L, 5, 15, 16),
|
||||
new AbortedTxn(2L, 18, 35, 25),
|
||||
new AbortedTxn(3L, 32, 50, 40));
|
||||
|
||||
abortedTransactions.forEach(txn -> assertDoesNotThrow(() -> index.append(txn)));
|
||||
|
||||
index.truncateTo(51);
|
||||
assertEquals(abortedTransactions, index.collectAbortedTxns(0L, 100L).abortedTransactions);
|
||||
|
||||
index.truncateTo(50);
|
||||
assertEquals(abortedTransactions.subList(0, 3), index.collectAbortedTxns(0L, 100L).abortedTransactions);
|
||||
|
||||
index.reset();
|
||||
assertEquals(Collections.emptyList(), index.collectAbortedTxns(0L, 100L).abortedTransactions);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAbortedTxnSerde() {
|
||||
long pid = 983493L;
|
||||
long firstOffset = 137L;
|
||||
long lastOffset = 299L;
|
||||
long lastStableOffset = 200L;
|
||||
|
||||
AbortedTxn abortedTxn = new AbortedTxn(pid, firstOffset, lastOffset, lastStableOffset);
|
||||
assertEquals(AbortedTxn.CURRENT_VERSION, abortedTxn.version());
|
||||
assertEquals(pid, abortedTxn.producerId());
|
||||
assertEquals(firstOffset, abortedTxn.firstOffset());
|
||||
assertEquals(lastOffset, abortedTxn.lastOffset());
|
||||
assertEquals(lastStableOffset, abortedTxn.lastStableOffset());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRenameIndex() throws IOException {
|
||||
File renamed = TestUtils.tempFile();
|
||||
index.append(new AbortedTxn(0L, 0, 10, 2));
|
||||
|
||||
index.renameTo(renamed);
|
||||
index.append(new AbortedTxn(1L, 5, 15, 16));
|
||||
|
||||
List<AbortedTxn> abortedTxns = index.collectAbortedTxns(0L, 100L).abortedTransactions;
|
||||
assertEquals(2, abortedTxns.size());
|
||||
assertEquals(0, abortedTxns.get(0).firstOffset());
|
||||
assertEquals(5, abortedTxns.get(1).firstOffset());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateParentDir() {
|
||||
File tmpParentDir = new File(TestUtils.tempDirectory(), "parent");
|
||||
tmpParentDir.mkdir();
|
||||
assertNotEquals(tmpParentDir, index.file().getParentFile());
|
||||
index.updateParentDir(tmpParentDir);
|
||||
assertEquals(tmpParentDir, index.file().getParentFile());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFlush() throws IOException {
|
||||
File nonExistentFile = TestUtils.tempFile();
|
||||
assertTrue(nonExistentFile.delete());
|
||||
try (TransactionIndex testIndex = new TransactionIndex(0, nonExistentFile)) {
|
||||
testIndex.flush();
|
||||
testIndex.append(new AbortedTxn(0L, 0, 10, 2));
|
||||
testIndex.flush();
|
||||
assertNotEquals(0, testIndex.file().length());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteIfExists() throws IOException {
|
||||
assertTrue(file.exists());
|
||||
index.deleteIfExists();
|
||||
assertFalse(file.exists());
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue