From c7f99bc2bd9af5eb6ca9e20a02d5806c52d434b3 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Mon, 11 Feb 2019 22:06:14 -0800 Subject: [PATCH] MINOR: Update JUnit to 4.13 and annotate log cleaner integration test (#6248) JUnit 4.13 fixes the issue where `Category` and `Parameterized` annotations could not be used together. It also deprecates `ExpectedException` and `assertThat`. Given this, we: - Replace `ExpectedException` with the newly introduced `assertThrows`. - Replace `Assert.assertThat` with `MatcherAssert.assertThat`. - Annotate `AbstractLogCleanerIntegrationTest` with `IntegrationTest` category. Reviewers: Ewen Cheslack-Postava , David Arthur --- .../clients/consumer/KafkaConsumerTest.java | 11 +- .../clients/consumer/MockConsumerTest.java | 2 +- .../kafka/common/metrics/KafkaMbeanTest.java | 2 +- .../kafka/common/record/KafkaLZ4Test.java | 49 +++--- .../record/MemoryRecordsBuilderTest.java | 161 ++++++++++-------- .../common/record/MemoryRecordsTest.java | 140 ++++++++------- .../apache/kafka/connect/data/StructTest.java | 36 ++-- .../AbstractLogCleanerIntegrationTest.scala | 3 + gradle/dependencies.gradle | 2 +- .../kafka/streams/StreamsConfigTest.java | 2 +- .../FineGrainedAutoResetIntegrationTest.java | 2 +- .../GlobalKTableIntegrationTest.java | 2 +- .../RegexSourceIntegrationTest.java | 2 +- .../RepartitionOptimizingIntegrationTest.java | 2 +- .../kafka/streams/internals/ApiUtilsTest.java | 4 +- .../kafka/streams/kstream/PrintedTest.java | 2 +- .../kstream/RepartitionTopicNamingTest.java | 2 +- .../internals/InternalStreamsBuilderTest.java | 2 +- .../internals/KStreamFlatTransformTest.java | 9 +- .../kstream/internals/KStreamImplTest.java | 18 +- .../internals/KStreamKStreamJoinTest.java | 2 +- .../kstream/internals/KTableSourceTest.java | 2 +- .../TransformerSupplierAdapterTest.java | 2 +- .../graph/GraphGraceSearchUtilTest.java | 2 +- .../CompositeRestoreListenerTest.java | 4 +- .../GlobalProcessorContextImplTest.java | 2 +- .../InternalTopologyBuilderTest.java | 2 +- .../internals/ProcessorContextTest.java | 2 +- .../processor/internals/StreamThreadTest.java | 2 +- .../StreamsPartitionAssignorTest.java | 2 +- .../metrics/StreamsMetricsImplTest.java | 2 +- .../internals/AbstractKeyValueStoreTest.java | 2 +- .../internals/CachingKeyValueStoreTest.java | 4 +- .../CompositeReadOnlyWindowStoreTest.java | 15 +- .../internals/InMemoryLRUCacheStoreTest.java | 2 +- .../internals/MeteredKeyValueStoreTest.java | 4 +- .../internals/MeteredSessionStoreTest.java | 4 +- ...OptionsColumnFamilyOptionsAdapterTest.java | 2 +- .../RocksDBSegmentedBytesStoreTest.java | 7 +- .../state/internals/RocksDBStoreTest.java | 2 +- .../RocksDBTimestampedStoreTest.java | 2 +- .../kafka/streams/TopologyTestDriverTest.java | 8 +- 42 files changed, 252 insertions(+), 277 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 0f367812207..138d20600f3 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -78,9 +78,7 @@ import org.apache.kafka.test.MockMetricsReporter; import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; import org.junit.Assert; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import java.nio.ByteBuffer; import java.time.Duration; @@ -115,6 +113,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; @@ -143,9 +142,6 @@ public class KafkaConsumerTest { private final String groupId = "mock-group"; - @Rule - public ExpectedException expectedException = ExpectedException.none(); - @Test public void testMetricsReporterAutoGeneratedClientId() { Properties props = new Properties(); @@ -840,13 +836,12 @@ public class KafkaConsumerTest { // interrupt the thread and call poll try { Thread.currentThread().interrupt(); - expectedException.expect(InterruptException.class); - consumer.poll(Duration.ZERO); + assertThrows(InterruptException.class, () -> consumer.poll(Duration.ZERO)); } finally { // clear interrupted state again since this thread may be reused by JUnit Thread.interrupted(); + consumer.close(Duration.ofMillis(0)); } - consumer.close(Duration.ofMillis(0)); } @Test diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java index 03013e6a9fa..aad4d2973a0 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java @@ -27,9 +27,9 @@ import java.util.HashMap; import java.util.Iterator; import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; public class MockConsumerTest { diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/KafkaMbeanTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/KafkaMbeanTest.java index 963a66abb29..dd494c88778 100644 --- a/clients/src/test/java/org/apache/kafka/common/metrics/KafkaMbeanTest.java +++ b/clients/src/test/java/org/apache/kafka/common/metrics/KafkaMbeanTest.java @@ -33,8 +33,8 @@ import java.lang.management.ManagementFactory; import java.util.List; import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; public class KafkaMbeanTest { diff --git a/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java b/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java index 222599b8b73..5e44c498254 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java +++ b/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java @@ -19,9 +19,7 @@ package org.apache.kafka.common.record; import net.jpountz.xxhash.XXHashFactory; import org.hamcrest.CoreMatchers; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; @@ -37,9 +35,11 @@ import java.util.List; import java.util.Random; import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; @RunWith(value = Parameterized.class) @@ -71,9 +71,6 @@ public class KafkaLZ4Test { } } - @Rule - public ExpectedException thrown = ExpectedException.none(); - @Parameters(name = "{index} useBrokenFlagDescriptorChecksum={0}, ignoreFlagDescriptorChecksum={1}, blockChecksum={2}, close={3}, payload={4}") public static Collection data() { List payloads = new ArrayList<>(); @@ -111,12 +108,10 @@ public class KafkaLZ4Test { } @Test - public void testHeaderPrematureEnd() throws Exception { - thrown.expect(IOException.class); - thrown.expectMessage(KafkaLZ4BlockInputStream.PREMATURE_EOS); - - final ByteBuffer buffer = ByteBuffer.allocate(2); - makeInputStream(buffer); + public void testHeaderPrematureEnd() { + ByteBuffer buffer = ByteBuffer.allocate(2); + IOException e = assertThrows(IOException.class, () -> makeInputStream(buffer)); + assertEquals(KafkaLZ4BlockInputStream.PREMATURE_EOS, e.getMessage()); } private KafkaLZ4BlockInputStream makeInputStream(ByteBuffer buffer) throws IOException { @@ -125,43 +120,41 @@ public class KafkaLZ4Test { @Test public void testNotSupported() throws Exception { - thrown.expect(IOException.class); - thrown.expectMessage(KafkaLZ4BlockInputStream.NOT_SUPPORTED); - byte[] compressed = compressedBytes(); compressed[0] = 0x00; - - makeInputStream(ByteBuffer.wrap(compressed)); + ByteBuffer buffer = ByteBuffer.wrap(compressed); + IOException e = assertThrows(IOException.class, () -> makeInputStream(buffer)); + assertEquals(KafkaLZ4BlockInputStream.NOT_SUPPORTED, e.getMessage()); } @Test public void testBadFrameChecksum() throws Exception { - if (!ignoreFlagDescriptorChecksum) { - thrown.expect(IOException.class); - thrown.expectMessage(KafkaLZ4BlockInputStream.DESCRIPTOR_HASH_MISMATCH); - } - byte[] compressed = compressedBytes(); compressed[6] = (byte) 0xFF; + ByteBuffer buffer = ByteBuffer.wrap(compressed); - makeInputStream(ByteBuffer.wrap(compressed)); + if (ignoreFlagDescriptorChecksum) { + makeInputStream(buffer); + } else { + IOException e = assertThrows(IOException.class, () -> makeInputStream(buffer)); + assertEquals(KafkaLZ4BlockInputStream.DESCRIPTOR_HASH_MISMATCH, e.getMessage()); + } } @Test public void testBadBlockSize() throws Exception { - if (!close || (useBrokenFlagDescriptorChecksum && !ignoreFlagDescriptorChecksum)) return; - - thrown.expect(IOException.class); - thrown.expectMessage(CoreMatchers.containsString("exceeded max")); + if (!close || (useBrokenFlagDescriptorChecksum && !ignoreFlagDescriptorChecksum)) + return; byte[] compressed = compressedBytes(); - final ByteBuffer buffer = ByteBuffer.wrap(compressed).order(ByteOrder.LITTLE_ENDIAN); + ByteBuffer buffer = ByteBuffer.wrap(compressed).order(ByteOrder.LITTLE_ENDIAN); int blockSize = buffer.getInt(7); blockSize = (blockSize & LZ4_FRAME_INCOMPRESSIBLE_MASK) | (1 << 24 & ~LZ4_FRAME_INCOMPRESSIBLE_MASK); buffer.putInt(7, blockSize); - testDecompression(buffer); + IOException e = assertThrows(IOException.class, () -> testDecompression(buffer)); + assertThat(e.getMessage(), CoreMatchers.containsString("exceeded max")); } diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java index c3560f9b950..522915f0640 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java @@ -20,9 +20,7 @@ import org.apache.kafka.common.errors.UnsupportedCompressionTypeException; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.test.TestUtils; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -30,6 +28,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.function.Supplier; import java.util.List; import java.util.Random; @@ -38,14 +37,13 @@ import static org.apache.kafka.common.utils.Utils.utf8; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.junit.Assume.assumeTrue; @RunWith(value = Parameterized.class) public class MemoryRecordsBuilderTest { - @Rule - public ExpectedException exceptionRule = ExpectedException.none(); - private final CompressionType compressionType; private final int bufferOffset; private final Time time; @@ -58,17 +56,25 @@ public class MemoryRecordsBuilderTest { @Test public void testWriteEmptyRecordSet() { - expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V0); + byte magic = RecordBatch.MAGIC_VALUE_V0; + assumeAtLeastV2OrNotZstd(magic); ByteBuffer buffer = ByteBuffer.allocate(128); buffer.position(bufferOffset); - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType, - TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, - false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); - MemoryRecords records = builder.build(); - assertEquals(0, records.sizeInBytes()); - assertEquals(bufferOffset, buffer.position()); + Supplier builderSupplier = () -> new MemoryRecordsBuilder(buffer, magic, + compressionType, TimestampType.CREATE_TIME, 0L, 0L, + RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, + false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + + if (compressionType != CompressionType.ZSTD) { + MemoryRecords records = builderSupplier.get().build(); + assertEquals(0, records.sizeInBytes()); + assertEquals(bufferOffset, buffer.position()); + } else { + Exception e = assertThrows(IllegalArgumentException.class, () -> builderSupplier.get().build()); + assertEquals(e.getMessage(), "ZStandard compression is not supported for magic " + magic); + } } @Test @@ -215,18 +221,19 @@ public class MemoryRecordsBuilderTest { @Test public void testCompressionRateV0() { - expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V0); + byte magic = RecordBatch.MAGIC_VALUE_V0; + assumeAtLeastV2OrNotZstd(magic); ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.position(bufferOffset); LegacyRecord[] records = new LegacyRecord[] { - LegacyRecord.create(RecordBatch.MAGIC_VALUE_V0, 0L, "a".getBytes(), "1".getBytes()), - LegacyRecord.create(RecordBatch.MAGIC_VALUE_V0, 1L, "b".getBytes(), "2".getBytes()), - LegacyRecord.create(RecordBatch.MAGIC_VALUE_V0, 2L, "c".getBytes(), "3".getBytes()), + LegacyRecord.create(magic, 0L, "a".getBytes(), "1".getBytes()), + LegacyRecord.create(magic, 1L, "b".getBytes(), "2".getBytes()), + LegacyRecord.create(magic, 2L, "c".getBytes(), "3".getBytes()), }; - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType, + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compressionType, TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); @@ -272,18 +279,19 @@ public class MemoryRecordsBuilderTest { @Test public void testCompressionRateV1() { - expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V1); + byte magic = RecordBatch.MAGIC_VALUE_V1; + assumeAtLeastV2OrNotZstd(magic); ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.position(bufferOffset); LegacyRecord[] records = new LegacyRecord[] { - LegacyRecord.create(RecordBatch.MAGIC_VALUE_V1, 0L, "a".getBytes(), "1".getBytes()), - LegacyRecord.create(RecordBatch.MAGIC_VALUE_V1, 1L, "b".getBytes(), "2".getBytes()), - LegacyRecord.create(RecordBatch.MAGIC_VALUE_V1, 2L, "c".getBytes(), "3".getBytes()), + LegacyRecord.create(magic, 0L, "a".getBytes(), "1".getBytes()), + LegacyRecord.create(magic, 1L, "b".getBytes(), "2".getBytes()), + LegacyRecord.create(magic, 2L, "c".getBytes(), "3".getBytes()), }; - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType, + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compressionType, TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); @@ -305,13 +313,14 @@ public class MemoryRecordsBuilderTest { @Test public void buildUsingLogAppendTime() { - expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V1); + byte magic = RecordBatch.MAGIC_VALUE_V1; + assumeAtLeastV2OrNotZstd(magic); ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.position(bufferOffset); long logAppendTime = System.currentTimeMillis(); - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType, + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compressionType, TimestampType.LOG_APPEND_TIME, 0L, logAppendTime, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); builder.append(0L, "a".getBytes(), "1".getBytes()); @@ -336,13 +345,14 @@ public class MemoryRecordsBuilderTest { @Test public void buildUsingCreateTime() { - expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V1); + byte magic = RecordBatch.MAGIC_VALUE_V1; + assumeAtLeastV2OrNotZstd(magic); ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.position(bufferOffset); long logAppendTime = System.currentTimeMillis(); - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType, + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compressionType, TimestampType.CREATE_TIME, 0L, logAppendTime, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); builder.append(0L, "a".getBytes(), "1".getBytes()); @@ -369,7 +379,8 @@ public class MemoryRecordsBuilderTest { @Test public void testAppendedChecksumConsistency() { - expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V0); + assumeAtLeastV2OrNotZstd(RecordBatch.MAGIC_VALUE_V0); + assumeAtLeastV2OrNotZstd(RecordBatch.MAGIC_VALUE_V1); ByteBuffer buffer = ByteBuffer.allocate(512); for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) { @@ -415,13 +426,14 @@ public class MemoryRecordsBuilderTest { @Test public void writePastLimit() { - expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V1); + byte magic = RecordBatch.MAGIC_VALUE_V1; + assumeAtLeastV2OrNotZstd(magic); ByteBuffer buffer = ByteBuffer.allocate(64); buffer.position(bufferOffset); long logAppendTime = System.currentTimeMillis(); - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType, + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compressionType, TimestampType.CREATE_TIME, 0L, logAppendTime, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); builder.setEstimatedCompressionRatio(0.5f); @@ -462,11 +474,6 @@ public class MemoryRecordsBuilderTest { @Test public void convertV2ToV1UsingMixedCreateAndLogAppendTime() { - if (compressionType == CompressionType.ZSTD) { - exceptionRule.expect(UnsupportedCompressionTypeException.class); - exceptionRule.expectMessage("Down-conversion of zstandard-compressed batches is not supported"); - } - ByteBuffer buffer = ByteBuffer.allocate(512); MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, compressionType, TimestampType.LOG_APPEND_TIME, 0L); @@ -493,36 +500,44 @@ public class MemoryRecordsBuilderTest { buffer.flip(); - ConvertedRecords convertedRecords = MemoryRecords.readableRecords(buffer) - .downConvert(RecordBatch.MAGIC_VALUE_V1, 0, time); - MemoryRecords records = convertedRecords.records(); + Supplier> convertedRecordsSupplier = () -> + MemoryRecords.readableRecords(buffer).downConvert(RecordBatch.MAGIC_VALUE_V1, 0, time); - // Transactional markers are skipped when down converting to V1, so exclude them from size - verifyRecordsProcessingStats(convertedRecords.recordConversionStats(), + if (compressionType != CompressionType.ZSTD) { + ConvertedRecords convertedRecords = convertedRecordsSupplier.get(); + MemoryRecords records = convertedRecords.records(); + + // Transactional markers are skipped when down converting to V1, so exclude them from size + verifyRecordsProcessingStats(convertedRecords.recordConversionStats(), 3, 3, records.sizeInBytes(), sizeExcludingTxnMarkers); - List batches = Utils.toList(records.batches().iterator()); - if (compressionType != CompressionType.NONE) { - assertEquals(2, batches.size()); - assertEquals(TimestampType.LOG_APPEND_TIME, batches.get(0).timestampType()); - assertEquals(TimestampType.CREATE_TIME, batches.get(1).timestampType()); - } else { - assertEquals(3, batches.size()); - assertEquals(TimestampType.LOG_APPEND_TIME, batches.get(0).timestampType()); - assertEquals(TimestampType.CREATE_TIME, batches.get(1).timestampType()); - assertEquals(TimestampType.CREATE_TIME, batches.get(2).timestampType()); - } + List batches = Utils.toList(records.batches().iterator()); + if (compressionType != CompressionType.NONE) { + assertEquals(2, batches.size()); + assertEquals(TimestampType.LOG_APPEND_TIME, batches.get(0).timestampType()); + assertEquals(TimestampType.CREATE_TIME, batches.get(1).timestampType()); + } else { + assertEquals(3, batches.size()); + assertEquals(TimestampType.LOG_APPEND_TIME, batches.get(0).timestampType()); + assertEquals(TimestampType.CREATE_TIME, batches.get(1).timestampType()); + assertEquals(TimestampType.CREATE_TIME, batches.get(2).timestampType()); + } - List logRecords = Utils.toList(records.records().iterator()); - assertEquals(3, logRecords.size()); - assertEquals(ByteBuffer.wrap("1".getBytes()), logRecords.get(0).key()); - assertEquals(ByteBuffer.wrap("2".getBytes()), logRecords.get(1).key()); - assertEquals(ByteBuffer.wrap("3".getBytes()), logRecords.get(2).key()); + List logRecords = Utils.toList(records.records().iterator()); + assertEquals(3, logRecords.size()); + assertEquals(ByteBuffer.wrap("1".getBytes()), logRecords.get(0).key()); + assertEquals(ByteBuffer.wrap("2".getBytes()), logRecords.get(1).key()); + assertEquals(ByteBuffer.wrap("3".getBytes()), logRecords.get(2).key()); + } else { + Exception e = assertThrows(UnsupportedCompressionTypeException.class, convertedRecordsSupplier::get); + assertEquals("Down-conversion of zstandard-compressed batches is not supported", e.getMessage()); + } } @Test public void convertToV1WithMixedV0AndV2Data() { - expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V0); + assumeAtLeastV2OrNotZstd(RecordBatch.MAGIC_VALUE_V0); + assumeAtLeastV2OrNotZstd(RecordBatch.MAGIC_VALUE_V1); ByteBuffer buffer = ByteBuffer.allocate(512); MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V0, @@ -598,31 +613,28 @@ public class MemoryRecordsBuilderTest { @Test public void shouldThrowIllegalStateExceptionOnBuildWhenAborted() { - expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V0); + byte magic = RecordBatch.MAGIC_VALUE_V0; + assumeAtLeastV2OrNotZstd(magic); ByteBuffer buffer = ByteBuffer.allocate(128); buffer.position(bufferOffset); - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType, + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compressionType, TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); builder.abort(); - try { - builder.build(); - fail("Should have thrown KafkaException"); - } catch (IllegalStateException e) { - // ok - } + assertThrows(IllegalStateException.class, builder::build); } @Test public void shouldResetBufferToInitialPositionOnAbort() { - expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V0); + byte magic = RecordBatch.MAGIC_VALUE_V0; + assumeAtLeastV2OrNotZstd(magic); ByteBuffer buffer = ByteBuffer.allocate(128); buffer.position(bufferOffset); - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType, + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compressionType, TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); builder.append(0L, "a".getBytes(), "1".getBytes()); @@ -632,12 +644,13 @@ public class MemoryRecordsBuilderTest { @Test public void shouldThrowIllegalStateExceptionOnCloseWhenAborted() { - expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V0); + byte magic = RecordBatch.MAGIC_VALUE_V0; + assumeAtLeastV2OrNotZstd(magic); ByteBuffer buffer = ByteBuffer.allocate(128); buffer.position(bufferOffset); - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType, + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compressionType, TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); builder.abort(); @@ -651,12 +664,13 @@ public class MemoryRecordsBuilderTest { @Test public void shouldThrowIllegalStateExceptionOnAppendWhenAborted() { - expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V0); + byte magic = RecordBatch.MAGIC_VALUE_V0; + assumeAtLeastV2OrNotZstd(magic); ByteBuffer buffer = ByteBuffer.allocate(128); buffer.position(bufferOffset); - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType, + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compressionType, TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); builder.abort(); @@ -734,10 +748,7 @@ public class MemoryRecordsBuilderTest { } } - private void expectExceptionWithZStd(CompressionType compressionType, byte magic) { - if (compressionType == CompressionType.ZSTD && magic < MAGIC_VALUE_V2) { - exceptionRule.expect(IllegalArgumentException.class); - exceptionRule.expectMessage("ZStandard compression is not supported for magic " + magic); - } + private void assumeAtLeastV2OrNotZstd(byte magic) { + assumeTrue(compressionType != CompressionType.ZSTD || magic >= MAGIC_VALUE_V2); } } diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java index 5f16acfb9e3..3d5a4f1a498 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java @@ -22,9 +22,7 @@ import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.test.TestUtils; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -32,6 +30,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.function.Supplier; import java.util.List; import static java.util.Arrays.asList; @@ -40,14 +39,13 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.junit.Assume.assumeTrue; @RunWith(value = Parameterized.class) public class MemoryRecordsTest { - @Rule - public ExpectedException exceptionRule = ExpectedException.none(); - private CompressionType compression; private byte magic; private long firstOffset; @@ -74,7 +72,7 @@ public class MemoryRecordsTest { @Test public void testIterator() { - expectExceptionWithZStd(compression, magic); + assumeAtLeastV2OrNotZstd(); ByteBuffer buffer = ByteBuffer.allocate(1024); @@ -159,8 +157,7 @@ public class MemoryRecordsTest { @Test public void testHasRoomForMethod() { - expectExceptionWithZStd(compression, magic); - + assumeAtLeastV2OrNotZstd(); MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), magic, compression, TimestampType.CREATE_TIME, 0L); builder.append(0L, "a".getBytes(), "1".getBytes()); @@ -447,60 +444,59 @@ public class MemoryRecordsTest { @Test public void testFilterToBatchDiscard() { - if (compression != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2) { - expectExceptionWithZStd(compression, magic); + assumeAtLeastV2OrNotZstd(); + assumeTrue(compression != CompressionType.NONE || magic >= MAGIC_VALUE_V2); - ByteBuffer buffer = ByteBuffer.allocate(2048); - MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 0L); - builder.append(10L, "1".getBytes(), "a".getBytes()); - builder.close(); + ByteBuffer buffer = ByteBuffer.allocate(2048); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 0L); + builder.append(10L, "1".getBytes(), "a".getBytes()); + builder.close(); - builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 1L); - builder.append(11L, "2".getBytes(), "b".getBytes()); - builder.append(12L, "3".getBytes(), "c".getBytes()); - builder.close(); + builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 1L); + builder.append(11L, "2".getBytes(), "b".getBytes()); + builder.append(12L, "3".getBytes(), "c".getBytes()); + builder.close(); - builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 3L); - builder.append(13L, "4".getBytes(), "d".getBytes()); - builder.append(20L, "5".getBytes(), "e".getBytes()); - builder.append(15L, "6".getBytes(), "f".getBytes()); - builder.close(); + builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 3L); + builder.append(13L, "4".getBytes(), "d".getBytes()); + builder.append(20L, "5".getBytes(), "e".getBytes()); + builder.append(15L, "6".getBytes(), "f".getBytes()); + builder.close(); - builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 6L); - builder.append(16L, "7".getBytes(), "g".getBytes()); - builder.close(); + builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 6L); + builder.append(16L, "7".getBytes(), "g".getBytes()); + builder.close(); - buffer.flip(); + buffer.flip(); - ByteBuffer filtered = ByteBuffer.allocate(2048); - MemoryRecords.readableRecords(buffer).filterTo(new TopicPartition("foo", 0), new MemoryRecords.RecordFilter() { - @Override - protected BatchRetention checkBatchRetention(RecordBatch batch) { - // discard the second and fourth batches - if (batch.lastOffset() == 2L || batch.lastOffset() == 6L) - return BatchRetention.DELETE; - return BatchRetention.DELETE_EMPTY; - } + ByteBuffer filtered = ByteBuffer.allocate(2048); + MemoryRecords.readableRecords(buffer).filterTo(new TopicPartition("foo", 0), new MemoryRecords.RecordFilter() { + @Override + protected BatchRetention checkBatchRetention(RecordBatch batch) { + // discard the second and fourth batches + if (batch.lastOffset() == 2L || batch.lastOffset() == 6L) + return BatchRetention.DELETE; + return BatchRetention.DELETE_EMPTY; + } - @Override - protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) { - return true; - } - }, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING); + @Override + protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) { + return true; + } + }, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING); - filtered.flip(); - MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered); + filtered.flip(); + MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered); - List batches = TestUtils.toList(filteredRecords.batches()); - assertEquals(2, batches.size()); - assertEquals(0L, batches.get(0).lastOffset()); - assertEquals(5L, batches.get(1).lastOffset()); - } + List batches = TestUtils.toList(filteredRecords.batches()); + assertEquals(2, batches.size()); + assertEquals(0L, batches.get(0).lastOffset()); + assertEquals(5L, batches.get(1).lastOffset()); } @Test public void testFilterToAlreadyCompactedLog() { - expectExceptionWithZStd(compression, magic); + assumeAtLeastV2OrNotZstd(); ByteBuffer buffer = ByteBuffer.allocate(2048); @@ -642,7 +638,7 @@ public class MemoryRecordsTest { @Test public void testFilterToWithUndersizedBuffer() { - expectExceptionWithZStd(compression, magic); + assumeAtLeastV2OrNotZstd(); ByteBuffer buffer = ByteBuffer.allocate(1024); MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 0L); @@ -694,7 +690,7 @@ public class MemoryRecordsTest { @Test public void testToString() { - expectExceptionWithZStd(compression, magic); + assumeAtLeastV2OrNotZstd(); long timestamp = 1000000; MemoryRecords memoryRecords = MemoryRecords.withRecords(magic, compression, @@ -726,7 +722,7 @@ public class MemoryRecordsTest { @Test public void testFilterTo() { - expectExceptionWithZStd(compression, magic); + assumeAtLeastV2OrNotZstd(); ByteBuffer buffer = ByteBuffer.allocate(2048); MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 0L); @@ -841,7 +837,7 @@ public class MemoryRecordsTest { @Test public void testFilterToPreservesLogAppendTime() { - expectExceptionWithZStd(compression, magic); + assumeAtLeastV2OrNotZstd(); long logAppendTime = System.currentTimeMillis(); @@ -887,7 +883,7 @@ public class MemoryRecordsTest { @Test public void testNextBatchSize() { - expectExceptionWithZStd(compression, magic); + assumeAtLeastV2OrNotZstd(); ByteBuffer buffer = ByteBuffer.allocate(2048); MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, @@ -902,37 +898,37 @@ public class MemoryRecordsTest { assertEquals(0, buffer.position()); buffer.limit(1); // size not in buffer - assertEquals(null, records.firstBatchSize()); + assertNull(records.firstBatchSize()); buffer.limit(Records.LOG_OVERHEAD); // magic not in buffer - assertEquals(null, records.firstBatchSize()); + assertNull(records.firstBatchSize()); buffer.limit(Records.HEADER_SIZE_UP_TO_MAGIC); // payload not in buffer assertEquals(size, records.firstBatchSize().intValue()); buffer.limit(size); byte magic = buffer.get(Records.MAGIC_OFFSET); buffer.put(Records.MAGIC_OFFSET, (byte) 10); - try { - records.firstBatchSize(); - fail("Did not fail with corrupt magic"); - } catch (CorruptRecordException e) { - // Expected exception - } + assertThrows(CorruptRecordException.class, records::firstBatchSize); buffer.put(Records.MAGIC_OFFSET, magic); buffer.put(Records.SIZE_OFFSET + 3, (byte) 0); - try { - records.firstBatchSize(); - fail("Did not fail with corrupt size"); - } catch (CorruptRecordException e) { - // Expected exception + assertThrows(CorruptRecordException.class, records::firstBatchSize); + } + + @Test + public void testWithRecords() { + Supplier recordsSupplier = () -> MemoryRecords.withRecords(magic, compression, + new SimpleRecord(10L, "key1".getBytes(), "value1".getBytes())); + if (compression != CompressionType.ZSTD || magic >= MAGIC_VALUE_V2) { + MemoryRecords memoryRecords = recordsSupplier.get(); + String key = Utils.utf8(memoryRecords.batches().iterator().next().iterator().next().key()); + assertEquals("key1", key); + } else { + assertThrows(IllegalArgumentException.class, recordsSupplier::get); } } - private void expectExceptionWithZStd(CompressionType compressionType, byte magic) { - if (compressionType == CompressionType.ZSTD && magic < MAGIC_VALUE_V2) { - exceptionRule.expect(IllegalArgumentException.class); - exceptionRule.expectMessage("ZStandard compression is not supported for magic " + magic); - } + private void assumeAtLeastV2OrNotZstd() { + assumeTrue(compression != CompressionType.ZSTD || magic >= MAGIC_VALUE_V2); } @Parameterized.Parameters(name = "{index} magic={0}, firstOffset={1}, compressionType={2}") diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java index a46226dffdc..415b2951816 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java @@ -17,9 +17,7 @@ package org.apache.kafka.connect.data; import org.apache.kafka.connect.errors.DataException; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import java.nio.ByteBuffer; import java.util.Arrays; @@ -30,6 +28,7 @@ import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; public class StructTest { @@ -285,9 +284,6 @@ public class StructTest { assertNotEquals(struct2.hashCode(), struct3.hashCode()); } - @Rule - public ExpectedException thrown = ExpectedException.none(); - @Test public void testValidateStructWithNullValue() { Schema schema = SchemaBuilder.struct() @@ -297,9 +293,9 @@ public class StructTest { .build(); Struct struct = new Struct(schema); - thrown.expect(DataException.class); - thrown.expectMessage("Invalid value: null used for required field: \"one\", schema type: STRING"); - struct.validate(); + Exception e = assertThrows(DataException.class, struct::validate); + assertEquals("Invalid value: null used for required field: \"one\", schema type: STRING", + e.getMessage()); } @Test @@ -307,13 +303,15 @@ public class StructTest { String fieldName = "field"; FakeSchema fakeSchema = new FakeSchema(); - thrown.expect(DataException.class); - thrown.expectMessage("Invalid Java object for schema type null: class java.lang.Object for field: \"field\""); - ConnectSchema.validateValue(fieldName, fakeSchema, new Object()); + Exception e = assertThrows(DataException.class, () -> ConnectSchema.validateValue(fieldName, + fakeSchema, new Object())); + assertEquals("Invalid Java object for schema type null: class java.lang.Object for field: \"field\"", + e.getMessage()); - thrown.expect(DataException.class); - thrown.expectMessage("Invalid Java object for schema type INT8: class java.lang.Object for field: \"field\""); - ConnectSchema.validateValue(fieldName, Schema.INT8_SCHEMA, new Object()); + e = assertThrows(DataException.class, () -> ConnectSchema.validateValue(fieldName, + Schema.INT8_SCHEMA, new Object())); + assertEquals("Invalid Java object for schema type INT8: class java.lang.Object for field: \"field\"", + e.getMessage()); } @Test @@ -323,9 +321,7 @@ public class StructTest { .field(fieldName, Schema.STRING_SCHEMA); Struct struct = new Struct(testSchema); - thrown.expect(DataException.class); - Field field = null; - struct.put(field, "valid"); + assertThrows(DataException.class, () -> struct.put((Field) null, "valid")); } @Test @@ -335,8 +331,8 @@ public class StructTest { .field(fieldName, Schema.STRING_SCHEMA); Struct struct = new Struct(testSchema); - thrown.expect(DataException.class); - thrown.expectMessage("Invalid value: null used for required field: \"fieldName\", schema type: STRING"); - struct.put(fieldName, null); + Exception e = assertThrows(DataException.class, () -> struct.put(fieldName, null)); + assertEquals("Invalid value: null used for required field: \"fieldName\", schema type: STRING", + e.getMessage()); } } diff --git a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala index cc35f1d9ff6..e77833636e3 100644 --- a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala @@ -26,12 +26,15 @@ import kafka.utils.Implicits._ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch} import org.apache.kafka.common.utils.Utils +import org.apache.kafka.test.IntegrationTest import org.junit.After +import org.junit.experimental.categories.Category import scala.collection.Seq import scala.collection.mutable.ListBuffer import scala.util.Random +@Category(Array(classOf[IntegrationTest])) abstract class AbstractLogCleanerIntegrationTest { var cleaner: LogCleaner = _ diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 9eac3d43084..67b9c3a7e03 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -64,7 +64,7 @@ versions += [ jaxrs: "2.1.1", jfreechart: "1.0.0", jopt: "5.0.4", - junit: "4.12", + junit: "4.13-beta-2", kafka_0100: "0.10.0.1", kafka_0101: "0.10.1.1", kafka_0102: "0.10.2.2", diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index 19d5ae0ac57..afebfdb0df4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -51,9 +51,9 @@ import static org.apache.kafka.streams.StreamsConfig.consumerPrefix; import static org.apache.kafka.streams.StreamsConfig.producerPrefix; import static org.apache.kafka.test.StreamsTestUtils.getStreamsConfig; import static org.hamcrest.core.IsEqual.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java index 6539168489f..a5984006b41 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java @@ -59,7 +59,7 @@ import kafka.utils.MockTime; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertThat; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.fail; @Category({IntegrationTest.class}) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java index 2190d7071fa..5007fa90f11 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java @@ -53,7 +53,7 @@ import java.util.Map; import java.util.Properties; import static org.hamcrest.core.IsEqual.equalTo; -import static org.junit.Assert.assertThat; +import static org.hamcrest.MatcherAssert.assertThat; @Category({IntegrationTest.class}) public class GlobalKTableIntegrationTest { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java index 2873593d36d..264cd35e457 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java @@ -65,7 +65,7 @@ import java.util.regex.Pattern; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertThat; +import static org.hamcrest.MatcherAssert.assertThat; /** * End-to-end integration test based on using regex and named topics for creating sources, using diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java index ec9df193657..6fa1bff0cf1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java @@ -61,8 +61,8 @@ import java.util.regex.Pattern; import static java.time.Duration.ofMillis; import static java.time.Duration.ofSeconds; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; @Category({IntegrationTest.class}) public class RepartitionOptimizingIntegrationTest { diff --git a/streams/src/test/java/org/apache/kafka/streams/internals/ApiUtilsTest.java b/streams/src/test/java/org/apache/kafka/streams/internals/ApiUtilsTest.java index 5fe30dd1e5b..6e4cdef46ac 100644 --- a/streams/src/test/java/org/apache/kafka/streams/internals/ApiUtilsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/internals/ApiUtilsTest.java @@ -25,8 +25,8 @@ import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFail import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration; import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondInstant; import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; @@ -107,4 +107,4 @@ public class ApiUtilsTest { assertThat(failMsgPrefix, containsString("variableName")); assertThat(failMsgPrefix, containsString("someValue")); } -} \ No newline at end of file +} diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java index 25261381a44..a113681ac41 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java @@ -36,7 +36,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import static org.hamcrest.CoreMatchers.equalTo; -import static org.junit.Assert.assertThat; +import static org.hamcrest.MatcherAssert.assertThat; public class PrintedTest { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java index fe75191b2a2..0b8627feec0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java @@ -35,7 +35,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertThat; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java index 8c14d177c2c..d06b0287855 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java @@ -47,10 +47,10 @@ import java.util.regex.Pattern; import static java.util.Arrays.asList; import static org.apache.kafka.streams.Topology.AutoOffsetReset; import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @SuppressWarnings("unchecked") diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformTest.java index bedf3d88e6c..d18a7a87578 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformTest.java @@ -25,9 +25,7 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.easymock.EasyMock; import org.easymock.EasyMockSupport; import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import java.util.Arrays; import java.util.Collections; @@ -44,16 +42,13 @@ public class KStreamFlatTransformTest extends EasyMockSupport { private KStreamFlatTransformProcessor processor; - @Rule - public final ExpectedException exception = ExpectedException.none(); - @Before public void setUp() { inputKey = 1; inputValue = 10; transformer = mock(Transformer.class); context = strictMock(ProcessorContext.class); - processor = new KStreamFlatTransformProcessor(transformer); + processor = new KStreamFlatTransformProcessor<>(transformer); } @Test @@ -139,4 +134,4 @@ public class KStreamFlatTransformTest extends EasyMockSupport { verifyAll(); assertTrue(processor instanceof KStreamFlatTransformProcessor); } -} \ No newline at end of file +} diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index 14d92d1bb47..76a01cde53c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -53,9 +53,7 @@ import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockValueJoiner; import org.apache.kafka.test.StreamsTestUtils; import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import java.time.Duration; import java.util.Arrays; @@ -71,9 +69,10 @@ import static java.util.Arrays.asList; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -91,9 +90,6 @@ public class KStreamImplTest { private Serde mySerde = new Serdes.StringSerde(); - @Rule - public final ExpectedException exception = ExpectedException.none(); - @Before public void before() { builder = new StreamsBuilder(); @@ -547,16 +543,14 @@ public class KStreamImplTest { @Test public void shouldNotAllowNullTransformSupplierOnTransform() { - exception.expect(NullPointerException.class); - exception.expectMessage("transformerSupplier can't be null"); - testStream.transform(null); + final Exception e = assertThrows(NullPointerException.class, () -> testStream.transform(null)); + assertEquals("transformerSupplier can't be null", e.getMessage()); } @Test public void shouldNotAllowNullTransformSupplierOnFlatTransform() { - exception.expect(NullPointerException.class); - exception.expectMessage("transformerSupplier can't be null"); - testStream.flatTransform(null); + final Exception e = assertThrows(NullPointerException.class, () -> testStream.flatTransform(null)); + assertEquals("transformerSupplier can't be null", e.getMessage()); } @Test(expected = NullPointerException.class) diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java index 133bd556fc4..71367b24f3d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java @@ -43,8 +43,8 @@ import java.util.Set; import static java.time.Duration.ofMillis; import static org.apache.kafka.test.StreamsTestUtils.getMetricByName; import static org.hamcrest.CoreMatchers.hasItem; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; public class KStreamKStreamJoinTest { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java index 0d42e44790a..ec739e07c3a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java @@ -40,9 +40,9 @@ import java.util.Properties; import static java.util.Arrays.asList; import static org.apache.kafka.test.StreamsTestUtils.getMetricByName; import static org.hamcrest.CoreMatchers.hasItem; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; public class KTableSourceTest { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TransformerSupplierAdapterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TransformerSupplierAdapterTest.java index def6ca1d0ff..9d4ba3d019a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TransformerSupplierAdapterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TransformerSupplierAdapterTest.java @@ -30,7 +30,7 @@ import org.junit.Test; import static org.hamcrest.core.IsEqual.equalTo; import static org.hamcrest.core.IsSame.sameInstance; import static org.hamcrest.core.IsNot.not; -import static org.junit.Assert.assertThat; +import static org.hamcrest.MatcherAssert.assertThat; @SuppressWarnings("unchecked") public class TransformerSupplierAdapterTest extends EasyMockSupport { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java index 5e426d95931..1612cb9c12b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java @@ -30,7 +30,7 @@ import org.junit.Test; import static java.time.Duration.ofMillis; import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertThat; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.fail; public class GraphGraceSearchUtilTest { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java index 5d7078c5a78..c981107e56a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java @@ -34,7 +34,7 @@ import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_BATCH; import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_END; import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_START; import static org.hamcrest.core.Is.is; -import static org.junit.Assert.assertThat; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertTrue; @@ -219,4 +219,4 @@ public class CompositeRestoreListenerTest { } } -} \ No newline at end of file +} diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java index 21b6623bd92..6803978137b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java @@ -32,8 +32,8 @@ import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.mock; import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.verify; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThat; public class GlobalProcessorContextImplTest { private static final String GLOBAL_STORE_NAME = "global-store"; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java index 1be5231f59a..1a46af86314 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java @@ -51,11 +51,11 @@ import static java.time.Duration.ofSeconds; import static java.util.Arrays.asList; import static org.apache.kafka.common.utils.Utils.mkSet; import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextTest.java index e652ee5c33a..45e01652a07 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextTest.java @@ -31,7 +31,7 @@ import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.mock; import static org.easymock.EasyMock.replay; import static org.hamcrest.Matchers.equalTo; -import static org.junit.Assert.assertThat; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.fail; public class ProcessorContextTest { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index f3d151c9462..11485e4e0c9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -91,12 +91,12 @@ import static org.apache.kafka.common.utils.Utils.mkProperties; import static org.apache.kafka.streams.processor.internals.AbstractStateManager.CHECKPOINT_FILE_NAME; import static org.apache.kafka.streams.processor.internals.StreamThread.getSharedAdminClientId; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java index 140c219b114..284e396f978 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java @@ -61,9 +61,9 @@ import static java.time.Duration.ofMillis; import static java.util.Arrays.asList; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; @SuppressWarnings("unchecked") diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java index cadfdb0cef1..e1d6f0c086c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java @@ -34,10 +34,10 @@ import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_METRICS_GROUP; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgMaxLatency; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount; +import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; public class StreamsMetricsImplTest { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java index 44ff3a2edd7..9d1284e573f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java @@ -39,9 +39,9 @@ import java.util.List; import java.util.Map; import static org.hamcrest.core.IsEqual.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; public abstract class AbstractKeyValueStoreTest { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java index cf15f9d060a..3ac05a01204 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java @@ -47,11 +47,11 @@ import java.util.Map; import static org.apache.kafka.streams.state.internals.ThreadCacheTest.memoryCacheEntrySize; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -346,4 +346,4 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest { forwarded.put(key, new Change<>(newValue, oldValue)); } } -} \ No newline at end of file +} diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java index 915d2a3bd08..2307b71aa33 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java @@ -26,9 +26,7 @@ import org.apache.kafka.test.StateStoreProviderStub; import org.apache.kafka.test.StreamsTestUtils; import org.junit.Assert; import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import java.util.Arrays; import java.util.Collections; @@ -40,10 +38,12 @@ import static java.util.Arrays.asList; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.IsEqual.equalTo; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; public class CompositeReadOnlyWindowStoreTest { private static final long WINDOW_SIZE = 30_000; + private final String storeName = "window-store"; private StateStoreProviderStub stubProviderOne; private StateStoreProviderStub stubProviderTwo; @@ -51,9 +51,6 @@ public class CompositeReadOnlyWindowStoreTest { private ReadOnlyWindowStoreStub underlyingWindowStore; private ReadOnlyWindowStoreStub otherUnderlyingStore; - @Rule - public final ExpectedException windowStoreIteratorException = ExpectedException.none(); - @Before public void before() { stubProviderOne = new StateStoreProviderStub(false); @@ -151,9 +148,7 @@ public class CompositeReadOnlyWindowStoreTest { final CompositeReadOnlyWindowStore store = new CompositeReadOnlyWindowStore<>(new StateStoreProviderStub(false), QueryableStoreTypes.windowStore(), "foo"); final WindowStoreIterator windowStoreIterator = store.fetch("key", ofEpochMilli(1), ofEpochMilli(10)); - - windowStoreIteratorException.expect(NoSuchElementException.class); - windowStoreIterator.peekNextKey(); + assertThrows(NoSuchElementException.class, windowStoreIterator::peekNextKey); } @Test @@ -161,9 +156,7 @@ public class CompositeReadOnlyWindowStoreTest { final CompositeReadOnlyWindowStore store = new CompositeReadOnlyWindowStore<>(new StateStoreProviderStub(false), QueryableStoreTypes.windowStore(), "foo"); final WindowStoreIterator windowStoreIterator = store.fetch("key", ofEpochMilli(1), ofEpochMilli(10)); - - windowStoreIteratorException.expect(NoSuchElementException.class); - windowStoreIterator.next(); + assertThrows(NoSuchElementException.class, windowStoreIterator::next); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java index 91eda9f7965..1bd40459940 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java @@ -30,8 +30,8 @@ import java.util.Arrays; import java.util.List; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; public class InMemoryLRUCacheStoreTest extends AbstractKeyValueStoreTest { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java index 0dca6346bbb..23effc93e85 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java @@ -47,8 +47,8 @@ import java.util.Map; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @RunWith(EasyMockRunner.class) @@ -229,4 +229,4 @@ public class MeteredKeyValueStoreTest { } -} \ No newline at end of file +} diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java index 92056fa827f..fd0496804ea 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java @@ -48,8 +48,8 @@ import java.util.Map; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @RunWith(EasyMockRunner.class) @@ -248,4 +248,4 @@ public class MeteredSessionStoreTest { return this.metrics.metric(new MetricName(name, "stream-scope-metrics", "", this.tags)); } -} \ No newline at end of file +} diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java index 897f94f5e4e..74e5cd538c5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java @@ -51,9 +51,9 @@ import java.util.List; import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.reset; import static org.easymock.EasyMock.verify; +import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.matchesPattern; -import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; /** diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java index 8097d74670d..3be20de33e4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java @@ -36,7 +36,6 @@ import org.apache.kafka.test.InternalMockProcessorContext; import org.apache.kafka.test.NoOpRecordCollector; import org.apache.kafka.test.TestUtils; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -377,7 +376,7 @@ public class RocksDBSegmentedBytesStoreTest { // Bulk loading is enabled during recovery. for (final KeyValueSegment segment : bytesStore.getSegments()) { - Assert.assertThat(segment.getOptions().level0FileNumCompactionTrigger(), equalTo(1 << 30)); + assertThat(segment.getOptions().level0FileNumCompactionTrigger(), equalTo(1 << 30)); } final List, Long>> expected = new ArrayList<>(); @@ -401,12 +400,12 @@ public class RocksDBSegmentedBytesStoreTest { restoreListener.onRestoreStart(null, bytesStore.name(), 0L, 0L); for (final KeyValueSegment segment : bytesStore.getSegments()) { - Assert.assertThat(segment.getOptions().level0FileNumCompactionTrigger(), equalTo(1 << 30)); + assertThat(segment.getOptions().level0FileNumCompactionTrigger(), equalTo(1 << 30)); } restoreListener.onRestoreEnd(null, bytesStore.name(), 0L); for (final KeyValueSegment segment : bytesStore.getSegments()) { - Assert.assertThat(segment.getOptions().level0FileNumCompactionTrigger(), equalTo(4)); + assertThat(segment.getOptions().level0FileNumCompactionTrigger(), equalTo(4)); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java index 4785673a150..db458eb7734 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java @@ -51,9 +51,9 @@ import java.util.Set; import static java.nio.charset.StandardCharsets.UTF_8; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java index 3347d02795f..f49527b0d7a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java @@ -36,9 +36,9 @@ import java.util.List; import static java.util.Arrays.asList; import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; public class RocksDBTimestampedStoreTest extends RocksDBStoreTest { diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java index 84aaa8a8e97..9de77980077 100644 --- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java +++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java @@ -74,10 +74,10 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkProperties; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -808,7 +808,7 @@ public class TopologyTestDriverTest { public void shouldNotUpdateStoreForSmallerValue() { setup(); testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 9999L)); - Assert.assertThat(store.get("a"), equalTo(21L)); + assertThat(store.get("a"), equalTo(21L)); OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L); Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer)); } @@ -817,7 +817,7 @@ public class TopologyTestDriverTest { public void shouldNotUpdateStoreForLargerValue() { setup(); testDriver.pipeInput(recordFactory.create("input-topic", "a", 42L, 9999L)); - Assert.assertThat(store.get("a"), equalTo(42L)); + assertThat(store.get("a"), equalTo(42L)); OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 42L); Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer)); } @@ -826,7 +826,7 @@ public class TopologyTestDriverTest { public void shouldUpdateStoreForNewKey() { setup(); testDriver.pipeInput(recordFactory.create("input-topic", "b", 21L, 9999L)); - Assert.assertThat(store.get("b"), equalTo(21L)); + assertThat(store.get("b"), equalTo(21L)); OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L); OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "b", 21L); Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));