KAFKA-12173 Migrate streams:streams-scala module to JUnit 5 (#9858)

1. replace org.junit.Assert by org.junit.jupiter.api.Assertions
2. replace org.junit by org.junit.jupiter.api
3. replace Before by BeforeEach
4. replace After by AfterEach
5. remove ExternalResource from all scala modules
6. add explicit AfterClass/BeforeClass to stop/start EmbeddedKafkaCluster

Noted that this PR does not migrate stream module to junit 5 so it does not introduce callback of junit 5 to deal with beforeAll/afterAll. The next PR of migrating stream module can replace explicit beforeAll/afterAll by junit 5 extension. Or we can keep the beforeAll/afterAll if it make code more readable.

Reviewers: John Roesler <vvcephei@apache.org>
This commit is contained in:
Chia-Ping Tsai 2021-03-25 01:04:39 +08:00 committed by GitHub
parent 7071ded2a6
commit 9af81955c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
63 changed files with 1037 additions and 605 deletions

View File

@ -271,7 +271,7 @@ subprojects {
}
// Remove the relevant project name once it's converted to JUnit 5
def shouldUseJUnit5 = !(["runtime", "streams-scala", "streams"].contains(it.project.name))
def shouldUseJUnit5 = !(["runtime", "streams"].contains(it.project.name))
def testLoggingEvents = ["passed", "skipped", "failed"]
def testShowStandardStreams = false
@ -1575,12 +1575,9 @@ project(':streams:streams-scala') {
testImplementation project(':clients').sourceSets.test.output
testImplementation project(':streams:test-utils')
testImplementation libs.junitJupiterApi
testImplementation libs.junitVintageEngine
testImplementation libs.scalatest
testImplementation libs.junitJupiter
testImplementation libs.easymock
testImplementation libs.hamcrest
testRuntimeOnly libs.slf4jlog4j
}

View File

@ -107,7 +107,6 @@ versions += [
scalaCollectionCompat: "2.3.0",
scalafmt: "1.5.1",
scalaJava8Compat : "0.9.1",
scalatest: "3.0.8",
scoverage: "1.4.1",
slf4j: "1.7.30",
snappy: "1.1.8.1",

View File

@ -37,8 +37,9 @@ import org.apache.kafka.test.TestUtils;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -75,9 +76,19 @@ import static org.junit.Assert.fail;
@Category(IntegrationTest.class)
public class AdjustStreamThreadCountTest {
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
@BeforeClass
public static void startCluster() throws IOException {
CLUSTER.start();
}
@AfterClass
public static void closeCluster() {
CLUSTER.stop();
}
@Rule
public TestName testName = new TestName();

View File

@ -33,6 +33,7 @@ import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
@ -42,6 +43,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Optional;
@ -76,9 +78,24 @@ public class EOSUncleanShutdownIntegrationTest {
@Parameterized.Parameter
public String eosConfig;
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);
@BeforeClass
public static void startCluster() throws IOException {
CLUSTER.start();
STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL);
STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TEST_FOLDER.getRoot().getPath());
}
@AfterClass
public static void closeCluster() {
CLUSTER.stop();
}
@ClassRule
public static final TemporaryFolder TEST_FOLDER = new TemporaryFolder(TestUtils.tempDirectory());
@ -88,16 +105,6 @@ public class EOSUncleanShutdownIntegrationTest {
private static final int RECORD_TOTAL = 3;
@BeforeClass
public static void setupConfigsAndUtils() {
STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL);
STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TEST_FOLDER.getRoot().getPath());
}
@Test
public void shouldWorkWithUncleanShutdownWipeOutStateStore() throws InterruptedException {
final String appId = "shouldWorkWithUncleanShutdownWipeOutStateStore";

View File

@ -32,13 +32,15 @@ import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import java.io.IOException;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
@ -51,8 +53,17 @@ import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.sa
@Category(IntegrationTest.class)
public class EmitOnChangeIntegrationTest {
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
@BeforeClass
public static void startCluster() throws IOException {
CLUSTER.start();
}
@AfterClass
public static void closeCluster() {
CLUSTER.stop();
}
@Rule
public TestName testName = new TestName();

View File

@ -53,14 +53,16 @@ import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
@ -118,12 +120,22 @@ public class EosBetaUpgradeIntegrationTest {
)
);
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(
NUM_BROKERS,
Utils.mkProperties(Collections.singletonMap("auto.create.topics.enable", "false"))
);
@BeforeClass
public static void startCluster() throws IOException {
CLUSTER.start();
}
@AfterClass
public static void closeCluster() {
CLUSTER.stop();
}
private static String applicationId;
private final static int NUM_TOPIC_PARTITIONS = 4;
private final static String CONSUMER_GROUP_ID = "readCommitted";

View File

@ -49,8 +49,9 @@ import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@ -61,6 +62,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -95,12 +97,22 @@ public class EosIntegrationTest {
private static final int MAX_POLL_INTERVAL_MS = 5 * 1000;
private static final int MAX_WAIT_TIME_MS = 60 * 1000;
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(
NUM_BROKERS,
Utils.mkProperties(Collections.singletonMap("auto.create.topics.enable", "true"))
);
@BeforeClass
public static void startCluster() throws IOException {
CLUSTER.start();
}
@AfterClass
public static void closeCluster() {
CLUSTER.stop();
}
private String applicationId;
private final static int NUM_TOPIC_PARTITIONS = 2;
private final static String CONSUMER_GROUP_ID = "readCommitted";

View File

@ -41,9 +41,9 @@ import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -71,8 +71,42 @@ public class FineGrainedAutoResetIntegrationTest {
private static final String OUTPUT_TOPIC_1 = "outputTopic_1";
private static final String OUTPUT_TOPIC_2 = "outputTopic_2";
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
@BeforeClass
public static void startCluster() throws IOException, InterruptedException {
CLUSTER.start();
CLUSTER.createTopics(
TOPIC_1_0,
TOPIC_2_0,
TOPIC_A_0,
TOPIC_C_0,
TOPIC_Y_0,
TOPIC_Z_0,
TOPIC_1_1,
TOPIC_2_1,
TOPIC_A_1,
TOPIC_C_1,
TOPIC_Y_1,
TOPIC_Z_1,
TOPIC_1_2,
TOPIC_2_2,
TOPIC_A_2,
TOPIC_C_2,
TOPIC_Y_2,
TOPIC_Z_2,
NOOP,
DEFAULT_OUTPUT_TOPIC,
OUTPUT_TOPIC_0,
OUTPUT_TOPIC_1,
OUTPUT_TOPIC_2);
}
@AfterClass
public static void closeCluster() {
CLUSTER.stop();
}
private final MockTime mockTime = CLUSTER.time;
private static final String TOPIC_1_0 = "topic-1_0";
@ -106,35 +140,6 @@ public class FineGrainedAutoResetIntegrationTest {
private final String topicYTestMessage = "topic-Y test";
private final String topicZTestMessage = "topic-Z test";
@BeforeClass
public static void startKafkaCluster() throws InterruptedException {
CLUSTER.createTopics(
TOPIC_1_0,
TOPIC_2_0,
TOPIC_A_0,
TOPIC_C_0,
TOPIC_Y_0,
TOPIC_Z_0,
TOPIC_1_1,
TOPIC_2_1,
TOPIC_A_1,
TOPIC_C_1,
TOPIC_Y_1,
TOPIC_Z_1,
TOPIC_1_2,
TOPIC_2_2,
TOPIC_A_2,
TOPIC_C_2,
TOPIC_Y_2,
TOPIC_Z_2,
NOOP,
DEFAULT_OUTPUT_TOPIC,
OUTPUT_TOPIC_0,
OUTPUT_TOPIC_1,
OUTPUT_TOPIC_2);
}
@Before
public void setUp() throws IOException {

View File

@ -42,8 +42,9 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -51,6 +52,7 @@ import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
@ -72,10 +74,20 @@ public class GlobalKTableEOSIntegrationTest {
BROKER_CONFIG.put("transaction.state.log.min.isr", 1);
}
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER =
new EmbeddedKafkaCluster(NUM_BROKERS, BROKER_CONFIG);
@BeforeClass
public static void startCluster() throws IOException {
CLUSTER.start();
}
@AfterClass
public static void closeCluster() {
CLUSTER.stop();
}
@Parameterized.Parameters(name = "{0}")
public static Collection<String[]> data() {
return Arrays.asList(new String[][] {

View File

@ -44,13 +44,15 @@ import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
@ -68,9 +70,19 @@ import static org.junit.Assert.assertNotNull;
public class GlobalKTableIntegrationTest {
private static final int NUM_BROKERS = 1;
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
@BeforeClass
public static void startCluster() throws IOException {
CLUSTER.start();
}
@AfterClass
public static void closeCluster() {
CLUSTER.stop();
}
private final MockTime mockTime = CLUSTER.time;
private final KeyValueMapper<String, Long, Long> keyMapper = (key, value) -> value;
private final ValueJoiner<Long, String, String> joiner = (value1, value2) -> value1 + "+" + value2;

View File

@ -39,13 +39,15 @@ import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
@ -80,9 +82,19 @@ public class GlobalThreadShutDownOrderTest {
private final AtomicInteger closeCounter = new AtomicInteger(0);
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, BROKER_CONFIG);
@BeforeClass
public static void startCluster() throws IOException {
CLUSTER.start();
}
@AfterClass
public static void closeCluster() {
CLUSTER.stop();
}
private final MockTime mockTime = CLUSTER.time;
private final String globalStore = "globalStore";
private StreamsBuilder builder;

View File

@ -42,12 +42,14 @@ import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.NoRetryException;
import org.apache.kafka.test.TestUtils;
import org.junit.ClassRule;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Properties;
@ -71,9 +73,19 @@ import static org.hamcrest.Matchers.is;
@Category(IntegrationTest.class)
public class HighAvailabilityTaskAssignorIntegrationTest {
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
@BeforeClass
public static void startCluster() throws IOException {
CLUSTER.start();
}
@AfterClass
public static void closeCluster() {
CLUSTER.stop();
}
@Rule
public TestName testName = new TestName();

View File

@ -45,9 +45,9 @@ import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -73,9 +73,20 @@ import static org.junit.Assert.assertTrue;
*/
@Category({IntegrationTest.class})
public class InternalTopicIntegrationTest {
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
@BeforeClass
public static void startCluster() throws IOException, InterruptedException {
CLUSTER.start();
CLUSTER.createTopics(DEFAULT_INPUT_TOPIC, DEFAULT_INPUT_TABLE_TOPIC);
}
@AfterClass
public static void closeCluster() {
CLUSTER.stop();
}
private static final String APP_ID = "internal-topics-integration-test";
private static final String DEFAULT_INPUT_TOPIC = "inputTopic";
private static final String DEFAULT_INPUT_TABLE_TOPIC = "inputTable";
@ -84,11 +95,6 @@ public class InternalTopicIntegrationTest {
private Properties streamsProp;
@BeforeClass
public static void startKafkaCluster() throws InterruptedException {
CLUSTER.createTopics(DEFAULT_INPUT_TOPIC, DEFAULT_INPUT_TABLE_TOPIC);
}
@Before
public void before() {
streamsProp = new Properties();

View File

@ -30,14 +30,15 @@ import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
@ -50,9 +51,25 @@ import static org.junit.Assert.assertThrows;
@Category({IntegrationTest.class})
public class JoinStoreIntegrationTest {
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
@BeforeClass
public static void startCluster() throws IOException {
CLUSTER.start();
STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL);
}
@AfterClass
public static void closeCluster() {
CLUSTER.stop();
}
@Rule
public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
@ -65,15 +82,6 @@ public class JoinStoreIntegrationTest {
StreamsBuilder builder;
@BeforeClass
public static void setupConfigsAndUtils() {
STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL);
}
@Before
public void prepareTopology() throws InterruptedException {
CLUSTER.createTopics(INPUT_TOPIC_LEFT, INPUT_TOPIC_RIGHT, OUTPUT_TOPIC);

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.integration;
import java.io.IOException;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
@ -30,9 +31,9 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -42,9 +43,24 @@ import static org.junit.Assert.assertTrue;
@Category({IntegrationTest.class})
public class JoinWithIncompleteMetadataIntegrationTest {
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
@BeforeClass
public static void startCluster() throws IOException {
CLUSTER.start();
STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL);
}
@AfterClass
public static void closeCluster() {
CLUSTER.stop();
}
@Rule
public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
@ -59,15 +75,6 @@ public class JoinWithIncompleteMetadataIntegrationTest {
final ValueJoiner<String, String, String> valueJoiner = (value1, value2) -> value1 + "-" + value2;
private KTable<Long, String> rightTable;
@BeforeClass
public static void setupConfigsAndUtils() {
STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL);
}
@Before
public void prepareTopology() throws InterruptedException {
CLUSTER.createTopics(INPUT_TOPIC_RIGHT, OUTPUT_TOPIC);

View File

@ -44,8 +44,9 @@ import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -68,9 +69,19 @@ public class KStreamAggregationDedupIntegrationTest {
private static final int NUM_BROKERS = 1;
private static final long COMMIT_INTERVAL_MS = 300L;
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
@BeforeClass
public static void startCluster() throws IOException {
CLUSTER.start();
}
@AfterClass
public static void closeCluster() {
CLUSTER.stop();
}
private final MockTime mockTime = CLUSTER.time;
private StreamsBuilder builder;
private Properties streamsConfiguration;

View File

@ -64,8 +64,9 @@ import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -102,9 +103,19 @@ import static org.junit.Assert.assertTrue;
public class KStreamAggregationIntegrationTest {
private static final int NUM_BROKERS = 1;
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
@BeforeClass
public static void startCluster() throws IOException {
CLUSTER.start();
}
@AfterClass
public static void closeCluster() {
CLUSTER.stop();
}
private final MockTime mockTime = CLUSTER.time;
private StreamsBuilder builder;
private Properties streamsConfiguration;

View File

@ -42,8 +42,9 @@ import org.apache.kafka.streams.kstream.Repartitioned;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -83,9 +84,19 @@ import static org.junit.Assert.assertTrue;
public class KStreamRepartitionIntegrationTest {
private static final int NUM_BROKERS = 1;
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
@BeforeClass
public static void startCluster() throws IOException {
CLUSTER.start();
}
@AfterClass
public static void closeCluster() {
CLUSTER.stop();
}
private String topicB;
private String inputTopic;
private String outputTopic;

View File

@ -43,9 +43,9 @@ import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -64,7 +64,6 @@ import static org.junit.Assert.assertEquals;
public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
private final static int NUM_BROKERS = 1;
@ClassRule
public final static EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
private final static MockTime MOCK_TIME = CLUSTER.time;
private final static String TABLE_1 = "table1";
@ -84,7 +83,8 @@ public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
private final static Properties PRODUCER_CONFIG_3 = new Properties();
@BeforeClass
public static void beforeTest() throws Exception {
public static void startCluster() throws IOException, InterruptedException {
CLUSTER.start();
//Use multiple partitions to ensure distribution of keys.
CLUSTER.createTopic(TABLE_1, 3, 1);
@ -107,7 +107,6 @@ public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
PRODUCER_CONFIG_3.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
PRODUCER_CONFIG_3.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
final List<KeyValue<Integer, Float>> table1 = asList(
new KeyValue<>(1, 1.33f),
new KeyValue<>(2, 2.22f),
@ -144,6 +143,11 @@ public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
}
@AfterClass
public static void closeCluster() {
CLUSTER.stop();
}
@Before
public void before() throws IOException {
final String stateDirBasePath = TestUtils.tempDirectory().getPath();

View File

@ -35,14 +35,15 @@ import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
@ -61,8 +62,19 @@ public class KTableKTableForeignKeyJoinDistributedTest {
private static final String LEFT_TABLE = "left_table";
private static final String RIGHT_TABLE = "right_table";
private static final String OUTPUT = "output-topic";
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
@BeforeClass
public static void startCluster() throws IOException, InterruptedException {
CLUSTER.start();
CLUSTER.createTopic(INPUT_TOPIC, 2, 1);
}
@AfterClass
public static void closeCluster() {
CLUSTER.stop();
}
private static final Properties CONSUMER_CONFIG = new Properties();
@Rule
@ -77,11 +89,6 @@ public class KTableKTableForeignKeyJoinDistributedTest {
private volatile boolean client1IsOk = false;
private volatile boolean client2IsOk = false;
@BeforeClass
public static void createTopics() throws InterruptedException {
CLUSTER.createTopic(INPUT_TOPIC, 2, 1);
}
@Before
public void setupTopics() throws InterruptedException {
CLUSTER.createTopic(LEFT_TABLE, 1, 1);

View File

@ -36,14 +36,15 @@ import org.apache.kafka.streams.processor.WallclockTimestampExtractor;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
@ -59,20 +60,11 @@ public class KTableSourceTopicRestartIntegrationTest {
private static final Properties PRODUCER_CONFIG = new Properties();
private static final Properties STREAMS_CONFIG = new Properties();
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
private final Time time = CLUSTER.time;
private final StreamsBuilder streamsBuilder = new StreamsBuilder();
private final Map<String, String> readKeyValues = new ConcurrentHashMap<>();
private String sourceTopic;
private KafkaStreams streams;
private Map<String, String> expectedInitialResultsMap;
private Map<String, String> expectedResultsWithDataWrittenDuringRestoreMap;
@BeforeClass
public static void setUpBeforeAllTests() {
public static void startCluster() throws IOException {
CLUSTER.start();
STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
@ -89,6 +81,21 @@ public class KTableSourceTopicRestartIntegrationTest {
PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
}
@AfterClass
public static void closeCluster() {
CLUSTER.stop();
}
private final Time time = CLUSTER.time;
private final StreamsBuilder streamsBuilder = new StreamsBuilder();
private final Map<String, String> readKeyValues = new ConcurrentHashMap<>();
private String sourceTopic;
private KafkaStreams streams;
private Map<String, String> expectedInitialResultsMap;
private Map<String, String> expectedResultsWithDataWrittenDuringRestoreMap;
@Rule
public TestName testName = new TestName();

View File

@ -40,8 +40,9 @@ import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTask
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -50,6 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
@ -75,9 +77,19 @@ import static org.junit.Assert.assertTrue;
@Category({IntegrationTest.class})
public class LagFetchIntegrationTest {
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
@BeforeClass
public static void startCluster() throws IOException {
CLUSTER.start();
}
@AfterClass
public static void closeCluster() {
CLUSTER.stop();
}
private static final long WAIT_TIMEOUT_MS = 120000;
private static final Logger LOG = LoggerFactory.getLogger(LagFetchIntegrationTest.class);

View File

@ -44,14 +44,16 @@ import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
@ -69,8 +71,18 @@ public class MetricsIntegrationTest {
private static final int NUM_BROKERS = 1;
private static final int NUM_THREADS = 2;
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
@BeforeClass
public static void startCluster() throws IOException {
CLUSTER.start();
}
@AfterClass
public static void closeCluster() {
CLUSTER.stop();
}
private final long timeout = 60000;
// Metric group

View File

@ -22,6 +22,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
@ -52,7 +53,9 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -65,18 +68,28 @@ public class OptimizedKTableIntegrationTest {
private static final String INPUT_TOPIC_NAME = "input-topic";
private static final String TABLE_NAME = "source-table";
@Rule
public final EmbeddedKafkaCluster cluster = new EmbeddedKafkaCluster(NUM_BROKERS);
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
@BeforeClass
public static void startCluster() throws IOException {
CLUSTER.start();
}
@AfterClass
public static void closeCluster() {
CLUSTER.stop();
}
@Rule
public final TestName testName = new TestName();
private final List<KafkaStreams> streamsToCleanup = new ArrayList<>();
private final MockTime mockTime = cluster.time;
private final MockTime mockTime = CLUSTER.time;
@Before
public void before() throws InterruptedException {
cluster.createTopic(INPUT_TOPIC_NAME, 2, 1);
CLUSTER.createTopic(INPUT_TOPIC_NAME, 2, 1);
}
@After
@ -154,7 +167,7 @@ public class OptimizedKTableIntegrationTest {
private void produceValueRange(final int key, final int start, final int endExclusive) {
final Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
@ -179,7 +192,7 @@ public class OptimizedKTableIntegrationTest {
config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + (++port));
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());

View File

@ -38,12 +38,13 @@ import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
@ -66,7 +67,6 @@ public class PurgeRepartitionTopicIntegrationTest {
private static final Integer PURGE_INTERVAL_MS = 10;
private static final Integer PURGE_SEGMENT_BYTES = 2000;
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, new Properties() {
{
put("log.retention.check.interval.ms", PURGE_INTERVAL_MS);
@ -74,6 +74,18 @@ public class PurgeRepartitionTopicIntegrationTest {
}
});
@BeforeClass
public static void startCluster() throws IOException, InterruptedException {
CLUSTER.start();
CLUSTER.createTopic(INPUT_TOPIC, 1, 1);
}
@AfterClass
public static void closeCluster() {
CLUSTER.stop();
}
private final Time time = CLUSTER.time;
private class RepartitionTopicCreatedWithExpectedConfigs implements TestCondition {
@ -139,11 +151,6 @@ public class PurgeRepartitionTopicIntegrationTest {
}
}
@BeforeClass
public static void createTopics() throws Exception {
CLUSTER.createTopic(INPUT_TOPIC, 1, 1);
}
@Before
public void setup() {
// create admin client for verification

View File

@ -62,8 +62,9 @@ import org.apache.kafka.test.NoRetryException;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -126,8 +127,18 @@ public class QueryableStateIntegrationTest {
private static final int NUM_BROKERS = 1;
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
@BeforeClass
public static void startCluster() throws IOException {
CLUSTER.start();
}
@AfterClass
public static void closeCluster() {
CLUSTER.stop();
}
private static final int STREAM_THREE_PARTITIONS = 4;
private final MockTime mockTime = CLUSTER.time;
private String streamOne = "stream-one";

View File

@ -48,9 +48,9 @@ import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
@ -82,8 +82,29 @@ import static org.hamcrest.Matchers.greaterThan;
@Category({IntegrationTest.class})
public class RegexSourceIntegrationTest {
private static final int NUM_BROKERS = 1;
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
@BeforeClass
public static void startCluster() throws IOException, InterruptedException {
CLUSTER.start();
CLUSTER.createTopics(
TOPIC_1,
TOPIC_2,
TOPIC_A,
TOPIC_C,
TOPIC_Y,
TOPIC_Z,
FA_TOPIC,
FOO_TOPIC);
CLUSTER.createTopic(PARTITIONED_TOPIC_1, 2, 1);
CLUSTER.createTopic(PARTITIONED_TOPIC_2, 2, 1);
}
@AfterClass
public static void closeCluster() {
CLUSTER.stop();
}
private final MockTime mockTime = CLUSTER.time;
private static final String TOPIC_1 = "topic-1";
@ -104,22 +125,6 @@ public class RegexSourceIntegrationTest {
private static volatile AtomicInteger topicSuffixGenerator = new AtomicInteger(0);
private String outputTopic;
@BeforeClass
public static void startKafkaCluster() throws InterruptedException {
CLUSTER.createTopics(
TOPIC_1,
TOPIC_2,
TOPIC_A,
TOPIC_C,
TOPIC_Y,
TOPIC_Z,
FA_TOPIC,
FOO_TOPIC);
CLUSTER.createTopic(PARTITIONED_TOPIC_1, 2, 1);
CLUSTER.createTopic(PARTITIONED_TOPIC_2, 2, 1);
}
@Before
public void setUp() throws InterruptedException {
outputTopic = createTopic(topicSuffixGenerator.incrementAndGet());

View File

@ -28,15 +28,17 @@ import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.test.IntegrationTest;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.List;
@ -56,7 +58,6 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest {
private static final String NON_EXISTING_TOPIC = "nonExistingTopic";
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER;
static {
@ -68,6 +69,16 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest {
CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
}
@BeforeClass
public static void startCluster() throws IOException {
CLUSTER.start();
}
@AfterClass
public static void closeCluster() {
CLUSTER.stop();
}
@Override
Map<String, Object> getClientSslConfig() {
return null;

View File

@ -23,10 +23,12 @@ import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestSslUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.util.Map;
import java.util.Properties;
@ -36,7 +38,6 @@ import java.util.Properties;
@Category({IntegrationTest.class})
public class ResetIntegrationWithSslTest extends AbstractResetIntegrationTest {
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER;
private static final Map<String, Object> SSL_CONFIG;
@ -61,6 +62,16 @@ public class ResetIntegrationWithSslTest extends AbstractResetIntegrationTest {
CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
}
@BeforeClass
public static void startCluster() throws IOException {
CLUSTER.start();
}
@AfterClass
public static void closeCluster() {
CLUSTER.stop();
}
@Override
Map<String, Object> getClientSslConfig() {
return SSL_CONFIG;

View File

@ -35,7 +35,8 @@ import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.ClassRule;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -43,6 +44,7 @@ import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@ -70,10 +72,19 @@ public class ResetPartitionTimeIntegrationTest {
BROKER_CONFIG.put("transaction.state.log.replication.factor", (short) 1);
BROKER_CONFIG.put("transaction.state.log.min.isr", 1);
}
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER =
new EmbeddedKafkaCluster(NUM_BROKERS, BROKER_CONFIG, 0L);
@BeforeClass
public static void startCluster() throws IOException {
CLUSTER.start();
}
@AfterClass
public static void closeCluster() {
CLUSTER.stop();
}
private static final StringDeserializer STRING_DESERIALIZER = new StringDeserializer();
private static final StringSerializer STRING_SERIALIZER = new StringSerializer();
private static final Serde<String> STRING_SERDE = Serdes.String();

View File

@ -56,13 +56,15 @@ import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
@ -89,9 +91,18 @@ import static org.junit.Assert.assertTrue;
public class RestoreIntegrationTest {
private static final int NUM_BROKERS = 1;
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
@BeforeClass
public static void startCluster() throws IOException {
CLUSTER.start();
}
@AfterClass
public static void closeCluster() {
CLUSTER.stop();
}
@Rule
public final TestName testName = new TestName();
private String appId;

View File

@ -40,8 +40,9 @@ import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -51,6 +52,7 @@ import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
@ -71,9 +73,18 @@ public class RocksDBMetricsIntegrationTest {
private static final int NUM_BROKERS = 3;
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
@BeforeClass
public static void startCluster() throws IOException {
CLUSTER.start();
}
@AfterClass
public static void closeCluster() {
CLUSTER.stop();
}
private static final String STREAM_INPUT_ONE = "STREAM_INPUT_ONE";
private static final String STREAM_OUTPUT_ONE = "STREAM_OUTPUT_ONE";
private static final String STREAM_INPUT_TWO = "STREAM_INPUT_TWO";

View File

@ -23,11 +23,13 @@ import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.tests.SmokeTestClient;
import org.apache.kafka.streams.tests.SmokeTestDriver;
import org.apache.kafka.test.IntegrationTest;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Map;
@ -39,9 +41,18 @@ import static org.apache.kafka.streams.tests.SmokeTestDriver.verify;
@Category(IntegrationTest.class)
public class SmokeTestDriverIntegrationTest {
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);
@BeforeClass
public static void startCluster() throws IOException {
CLUSTER.start();
}
@AfterClass
public static void closeCluster() {
CLUSTER.stop();
}
private static class Driver extends Thread {
private String bootstrapServers;

View File

@ -35,13 +35,14 @@ import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import java.io.IOException;
import java.util.Properties;
import java.util.function.Predicate;
@ -52,9 +53,19 @@ public class StandbyTaskCreationIntegrationTest {
private static final int NUM_BROKERS = 1;
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
@BeforeClass
public static void startCluster() throws IOException, InterruptedException {
CLUSTER.start();
CLUSTER.createTopic(INPUT_TOPIC, 2, 1);
}
@AfterClass
public static void closeCluster() {
CLUSTER.stop();
}
@Rule
public TestName testName = new TestName();
@ -65,11 +76,6 @@ public class StandbyTaskCreationIntegrationTest {
private volatile boolean client1IsOk = false;
private volatile boolean client2IsOk = false;
@BeforeClass
public static void createTopics() throws InterruptedException {
CLUSTER.createTopic(INPUT_TOPIC, 2, 1);
}
@After
public void after() {
client1.close();

View File

@ -41,8 +41,9 @@ import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -51,6 +52,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
@ -99,9 +101,17 @@ public class StandbyTaskEOSIntegrationTest {
private KafkaStreams streamInstanceTwo;
private KafkaStreams streamInstanceOneRecovery;
private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);
@BeforeClass
public static void startCluster() throws IOException {
CLUSTER.start();
}
@AfterClass
public static void closeCluster() {
CLUSTER.stop();
}
@Rule
public TestName testName = new TestName();

View File

@ -33,11 +33,13 @@ import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@ -54,8 +56,18 @@ public class StateRestorationIntegrationTest {
private Properties streamsConfiguration;
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
@BeforeClass
public static void startCluster() throws IOException {
CLUSTER.start();
}
@AfterClass
public static void closeCluster() {
CLUSTER.stop();
}
private final MockTime mockTime = CLUSTER.time;
@Before

View File

@ -48,6 +48,7 @@ import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
@ -82,7 +83,6 @@ public class StoreQueryIntegrationTest {
private static final String INPUT_TOPIC_NAME = "input-topic";
private static final String TABLE_NAME = "source-table";
@Rule
public final EmbeddedKafkaCluster cluster = new EmbeddedKafkaCluster(NUM_BROKERS);
@Rule
@ -92,7 +92,8 @@ public class StoreQueryIntegrationTest {
private final MockTime mockTime = cluster.time;
@Before
public void before() throws InterruptedException {
public void before() throws InterruptedException, IOException {
cluster.start();
cluster.createTopic(INPUT_TOPIC_NAME, 2, 1);
}
@ -101,6 +102,7 @@ public class StoreQueryIntegrationTest {
for (final KafkaStreams kafkaStreams : streamsToCleanup) {
kafkaStreams.close();
}
cluster.stop();
}
@Test

View File

@ -42,13 +42,15 @@ import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import java.io.IOException;
import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
@ -65,9 +67,18 @@ public class StoreUpgradeIntegrationTest {
private KafkaStreams kafkaStreams;
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
@BeforeClass
public static void startCluster() throws IOException {
CLUSTER.start();
}
@AfterClass
public static void closeCluster() {
CLUSTER.stop();
}
@Rule
public TestName testName = new TestName();

View File

@ -39,8 +39,9 @@ import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -68,9 +69,18 @@ import static org.junit.Assert.assertTrue;
public class StreamTableJoinTopologyOptimizationIntegrationTest {
private static final int NUM_BROKERS = 1;
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
@BeforeClass
public static void startCluster() throws IOException {
CLUSTER.start();
}
@AfterClass
public static void closeCluster() {
CLUSTER.stop();
}
private String tableTopic;
private String inputTopic;
private String outputTopic;

View File

@ -37,8 +37,9 @@ import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -70,8 +71,18 @@ import static org.junit.Assert.fail;
@Category(IntegrationTest.class)
@SuppressWarnings("deprecation") //Need to call the old handler, will remove those calls when the old handler is removed
public class StreamsUncaughtExceptionHandlerIntegrationTest {
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1, new Properties(), 0L, 0L);
@BeforeClass
public static void startCluster() throws IOException {
CLUSTER.start();
}
@AfterClass
public static void closeCluster() {
CLUSTER.stop();
}
public static final Duration DEFAULT_DURATION = Duration.ofSeconds(30);
@Rule

View File

@ -22,11 +22,12 @@ import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.tests.StreamsUpgradeTest;
import org.apache.kafka.test.IntegrationTest;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
@ -43,14 +44,20 @@ import static org.hamcrest.MatcherAssert.assertThat;
@Category(IntegrationTest.class)
public class StreamsUpgradeTestIntegrationTest {
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);
@BeforeClass
public static void setup() {
public static void startCluster() throws IOException {
CLUSTER.start();
IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, 1, "data");
}
@AfterClass
public static void closeCluster() {
CLUSTER.stop();
}
@Test
public void testVersionProbingUpgrade() throws InterruptedException {
final KafkaStreams kafkaStreams1 = StreamsUpgradeTest.buildStreams(mkProperties(

View File

@ -44,7 +44,8 @@ import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.ClassRule;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -54,6 +55,7 @@ import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
@ -85,13 +87,22 @@ import static org.hamcrest.Matchers.equalTo;
public class SuppressionDurabilityIntegrationTest {
private static final Logger LOG = LoggerFactory.getLogger(SuppressionDurabilityIntegrationTest.class);
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(
3,
mkProperties(mkMap()),
0L
);
@BeforeClass
public static void startCluster() throws IOException {
CLUSTER.start();
}
@AfterClass
public static void closeCluster() {
CLUSTER.stop();
}
@Rule
public TestName testName = new TestName();

View File

@ -45,10 +45,12 @@ import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.Matchers;
import org.junit.ClassRule;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
@ -79,12 +81,22 @@ import static org.hamcrest.Matchers.empty;
@Category(IntegrationTest.class)
public class SuppressionIntegrationTest {
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(
1,
mkProperties(mkMap()),
0L
);
@BeforeClass
public static void startCluster() throws IOException {
CLUSTER.start();
}
@AfterClass
public static void closeCluster() {
CLUSTER.stop();
}
private static final StringSerializer STRING_SERIALIZER = new StringSerializer();
private static final Serde<String> STRING_SERDE = Serdes.String();
private static final int COMMIT_INTERVAL = 100;

View File

@ -31,12 +31,14 @@ import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityT
import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.ClassRule;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.List;
import java.util.Properties;
@ -54,9 +56,19 @@ import static org.hamcrest.Matchers.sameInstance;
@Category(IntegrationTest.class)
public class TaskAssignorIntegrationTest {
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
@BeforeClass
public static void startCluster() throws IOException {
CLUSTER.start();
}
@AfterClass
public static void closeCluster() {
CLUSTER.stop();
}
@Rule
public TestName testName = new TestName();

View File

@ -26,7 +26,6 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.rules.ExternalResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -44,7 +43,7 @@ import java.util.concurrent.ExecutionException;
/**
* Runs an in-memory, "embedded" Kafka cluster with 1 ZooKeeper instance and supplied number of Kafka brokers.
*/
public class EmbeddedKafkaCluster extends ExternalResource {
public class EmbeddedKafkaCluster {
private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
private static final int DEFAULT_BROKER_PORT = 0; // 0 results in a random port being selected
@ -119,7 +118,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
/**
* Stop the Kafka cluster.
*/
private void stop() {
public void stop() {
if (brokers.length > 1) {
// delete the topics first to avoid cascading leader elections while shutting down the brokers
final Set<String> topics = getAllTopicsInCluster();
@ -162,16 +161,6 @@ public class EmbeddedKafkaCluster extends ExternalResource {
return brokers[0].brokerList();
}
@Override
protected void before() throws Throwable {
start();
}
@Override
protected void after() {
stop();
}
/**
* Create multiple Kafka topics each with 1 partition and a replication factor of 1.
*

View File

@ -30,7 +30,6 @@ import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.utils.Utils;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -55,7 +54,7 @@ public class KafkaEmbedded {
private final Properties effectiveConfig;
private final File logDir;
private final TemporaryFolder tmpFolder;
private final File tmpFolder;
private final KafkaServer kafka;
/**
@ -67,9 +66,8 @@ public class KafkaEmbedded {
*/
@SuppressWarnings("WeakerAccess")
public KafkaEmbedded(final Properties config, final MockTime time) throws IOException {
tmpFolder = new TemporaryFolder();
tmpFolder.create();
logDir = tmpFolder.newFolder();
tmpFolder = org.apache.kafka.test.TestUtils.tempDirectory();
logDir = org.apache.kafka.test.TestUtils.tempDirectory(tmpFolder.toPath(), "log");
effectiveConfig = effectiveConfigFrom(config);
final boolean loggingEnabled = true;
final KafkaConfig kafkaConfig = new KafkaConfig(effectiveConfig, loggingEnabled);
@ -135,11 +133,10 @@ public class KafkaEmbedded {
kafka.awaitShutdown();
log.debug("Removing log dir at {} ...", logDir);
try {
Utils.delete(logDir);
Utils.delete(tmpFolder);
} catch (final IOException e) {
throw new RuntimeException(e);
}
tmpFolder.delete();
log.debug("Shutdown of embedded Kafka broker at {} completed (with ZK ensemble at {}) ...",
brokerList(), zookeeperConnect());
}

View File

@ -29,13 +29,15 @@ import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
@ -52,9 +54,18 @@ public class HandlingSourceTopicDeletionIntegrationTest {
private static final String INPUT_TOPIC = "inputTopic";
private static final String OUTPUT_TOPIC = "outputTopic";
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
@BeforeClass
public static void startCluster() throws IOException {
CLUSTER.start();
}
@AfterClass
public static void closeCluster() {
CLUSTER.stop();
}
@Rule
public TestName testName = new TestName();

View File

@ -23,9 +23,8 @@ import org.apache.kafka.streams.scala.serialization.{Serdes => NewSerdes}
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.scala.utils.StreamToTableJoinScalaIntegrationTestBase
import org.apache.kafka.test.IntegrationTest
import org.junit._
import org.junit.experimental.categories.Category
import org.junit.jupiter.api._
import org.junit.jupiter.api.Assertions._
/**
* Test suite that does an example to demonstrate stream-table joins in Kafka Streams
@ -33,7 +32,7 @@ import org.junit.experimental.categories.Category
* The suite contains the test case using Scala APIs `testShouldCountClicksPerRegion` and the same test case using the
* Java APIs `testShouldCountClicksPerRegionJava`. The idea is to demonstrate that both generate the same result.
*/
@Category(Array(classOf[IntegrationTest]))
@Tag("integration")
class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJoinScalaIntegrationTestBase {
@Test def testShouldCountClicksPerRegion(): Unit = {
@ -74,7 +73,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ
val actualClicksPerRegion: java.util.List[KeyValue[String, Long]] =
produceNConsume(userClicksTopic, userRegionsTopic, outputTopic)
Assert.assertTrue("Expected to process some data", !actualClicksPerRegion.isEmpty)
assertTrue(!actualClicksPerRegion.isEmpty, "Expected to process some data")
streams.close()
}
@ -118,7 +117,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ
val actualClicksPerRegion: java.util.List[KeyValue[String, Long]] =
produceNConsume(userClicksTopic, userRegionsTopic, outputTopic)
Assert.assertTrue("Expected to process some data", !actualClicksPerRegion.isEmpty)
assertTrue(!actualClicksPerRegion.isEmpty, "Expected to process some data")
streams.close()
}

View File

@ -46,8 +46,8 @@ import org.apache.kafka.streams.scala.serialization.{Serdes => NewSerdes}
import org.apache.kafka.streams.scala.serialization.Serdes._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KeyValue, StreamsConfig, TopologyDescription, StreamsBuilder => StreamsBuilderJ}
import org.junit.Assert._
import org.junit._
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api._
import scala.jdk.CollectionConverters._

View File

@ -21,21 +21,21 @@ package org.apache.kafka.streams.scala
import java.util.Properties
import java.util.regex.Pattern
import org.junit.Assert._
import org.junit._
import org.junit.rules.TemporaryFolder
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api._
import org.apache.kafka.streams.scala.serialization.{Serdes => NewSerdes}
import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsConfig}
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.integration.utils.{EmbeddedKafkaCluster, IntegrationTestUtils}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.utils.MockTime
import org.apache.kafka.test.{IntegrationTest, TestUtils}
import org.apache.kafka.common.utils.{MockTime, Utils}
import ImplicitConversions._
import org.apache.kafka.common.serialization.{LongDeserializer, StringDeserializer, StringSerializer}
import org.junit.experimental.categories.Category
import org.apache.kafka.test.TestUtils
import org.junit.jupiter.api.Tag
import java.io.File
/**
* Test suite that does a classic word count example.
@ -43,28 +43,32 @@ import org.junit.experimental.categories.Category
* The suite contains the test case using Scala APIs `testShouldCountWords` and the same test case using the
* Java APIs `testShouldCountWordsJava`. The idea is to demonstrate that both generate the same result.
*/
@Category(Array(classOf[IntegrationTest]))
@Tag("integration")
class WordCountTest extends WordCountTestData {
private val privateCluster: EmbeddedKafkaCluster = new EmbeddedKafkaCluster(1)
private val cluster: EmbeddedKafkaCluster = new EmbeddedKafkaCluster(1)
@Rule def cluster: EmbeddedKafkaCluster = privateCluster
final val alignedTime = (System.currentTimeMillis() / 1000 + 1) * 1000
val mockTime: MockTime = cluster.time
final private val alignedTime = (System.currentTimeMillis() / 1000 + 1) * 1000
private val mockTime: MockTime = cluster.time
mockTime.setCurrentTimeMs(alignedTime)
val tFolder: TemporaryFolder = new TemporaryFolder(TestUtils.tempDirectory())
@Rule def testFolder: TemporaryFolder = tFolder
private val testFolder: File = TestUtils.tempDirectory()
@Before
@BeforeEach
def startKafkaCluster(): Unit = {
cluster.start()
cluster.createTopic(inputTopic)
cluster.createTopic(outputTopic)
cluster.createTopic(inputTopicJ)
cluster.createTopic(outputTopicJ)
}
@AfterEach
def stopKafkaCluster(): Unit = {
cluster.stop()
Utils.delete(testFolder)
}
@Test
def testShouldCountWords(): Unit = {
import org.apache.kafka.streams.scala.serialization.Serdes._
@ -181,7 +185,7 @@ class WordCountTest extends WordCountTestData {
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers())
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "10000")
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot.getPath)
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getPath)
streamsConfiguration
}

View File

@ -21,53 +21,54 @@ package org.apache.kafka.streams.scala.kstream
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.kstream.internals.ConsumedInternal
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp
import org.apache.kafka.streams.scala.serialization.Serdes._
import org.apache.kafka.streams.scala.serialization.Serdes
import org.junit.runner.RunWith
import org.scalatest.{FlatSpec, Matchers}
import org.scalatestplus.junit.JUnitRunner
import org.apache.kafka.streams.scala.serialization.Serdes._
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
@RunWith(classOf[JUnitRunner])
class ConsumedTest extends FlatSpec with Matchers {
class ConsumedTest {
"Create a Consumed" should "create a Consumed with Serdes" in {
@Test
def testCreateConsumed(): Unit = {
val consumed: Consumed[String, Long] = Consumed.`with`[String, Long]
val internalConsumed = new ConsumedInternal(consumed)
internalConsumed.keySerde.getClass shouldBe Serdes.stringSerde.getClass
internalConsumed.valueSerde.getClass shouldBe Serdes.longSerde.getClass
assertEquals(Serdes.stringSerde.getClass, internalConsumed.keySerde.getClass)
assertEquals(Serdes.longSerde.getClass, internalConsumed.valueSerde.getClass)
}
"Create a Consumed with timestampExtractor and resetPolicy" should "create a Consumed with Serdes, timestampExtractor and resetPolicy" in {
@Test
def testCreateConsumedWithTimestampExtractorAndResetPolicy(): Unit = {
val timestampExtractor = new FailOnInvalidTimestamp()
val resetPolicy = Topology.AutoOffsetReset.LATEST
val consumed: Consumed[String, Long] =
Consumed.`with`[String, Long](timestampExtractor, resetPolicy)
val internalConsumed = new ConsumedInternal(consumed)
internalConsumed.keySerde.getClass shouldBe Serdes.stringSerde.getClass
internalConsumed.valueSerde.getClass shouldBe Serdes.longSerde.getClass
internalConsumed.timestampExtractor shouldBe timestampExtractor
internalConsumed.offsetResetPolicy shouldBe resetPolicy
assertEquals(Serdes.stringSerde.getClass, internalConsumed.keySerde.getClass)
assertEquals(Serdes.longSerde.getClass, internalConsumed.valueSerde.getClass)
assertEquals(timestampExtractor, internalConsumed.timestampExtractor)
assertEquals(resetPolicy, internalConsumed.offsetResetPolicy)
}
"Create a Consumed with timestampExtractor" should "create a Consumed with Serdes and timestampExtractor" in {
@Test
def testCreateConsumedWithTimestampExtractor(): Unit = {
val timestampExtractor = new FailOnInvalidTimestamp()
val consumed: Consumed[String, Long] = Consumed.`with`[String, Long](timestampExtractor)
val internalConsumed = new ConsumedInternal(consumed)
internalConsumed.keySerde.getClass shouldBe Serdes.stringSerde.getClass
internalConsumed.valueSerde.getClass shouldBe Serdes.longSerde.getClass
internalConsumed.timestampExtractor shouldBe timestampExtractor
assertEquals(Serdes.stringSerde.getClass, internalConsumed.keySerde.getClass)
assertEquals(Serdes.longSerde.getClass, internalConsumed.valueSerde.getClass)
assertEquals(timestampExtractor, internalConsumed.timestampExtractor)
}
"Create a Consumed with resetPolicy" should "create a Consumed with Serdes and resetPolicy" in {
@Test
def testCreateConsumedWithResetPolicy(): Unit = {
val resetPolicy = Topology.AutoOffsetReset.LATEST
val consumed: Consumed[String, Long] = Consumed.`with`[String, Long](resetPolicy)
val internalConsumed = new ConsumedInternal(consumed)
internalConsumed.keySerde.getClass shouldBe Serdes.stringSerde.getClass
internalConsumed.valueSerde.getClass shouldBe Serdes.longSerde.getClass
internalConsumed.offsetResetPolicy shouldBe resetPolicy
assertEquals(Serdes.stringSerde.getClass, internalConsumed.keySerde.getClass)
assertEquals(Serdes.longSerde.getClass, internalConsumed.valueSerde.getClass)
assertEquals(resetPolicy, internalConsumed.offsetResetPolicy)
}
}

View File

@ -17,31 +17,30 @@
package org.apache.kafka.streams.scala.kstream
import org.apache.kafka.streams.kstream.internals.GroupedInternal
import org.apache.kafka.streams.scala.serialization.Serdes._
import org.apache.kafka.streams.scala.serialization.Serdes
import org.junit.runner.RunWith
import org.scalatest.{FlatSpec, Matchers}
import org.scalatestplus.junit.JUnitRunner
import org.apache.kafka.streams.scala.serialization.Serdes._
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
@RunWith(classOf[JUnitRunner])
class GroupedTest extends FlatSpec with Matchers {
class GroupedTest {
"Create a Grouped" should "create a Grouped with Serdes" in {
@Test
def testCreateGrouped(): Unit = {
val grouped: Grouped[String, Long] = Grouped.`with`[String, Long]
val internalGrouped = new GroupedInternal[String, Long](grouped)
internalGrouped.keySerde.getClass shouldBe Serdes.stringSerde.getClass
internalGrouped.valueSerde.getClass shouldBe Serdes.longSerde.getClass
assertEquals(Serdes.stringSerde.getClass, internalGrouped.keySerde.getClass)
assertEquals(Serdes.longSerde.getClass, internalGrouped.valueSerde.getClass)
}
"Create a Grouped with repartition topic name" should "create a Grouped with Serdes, and repartition topic name" in {
@Test
def testCreateGroupedWithRepartitionTopicName(): Unit = {
val repartitionTopicName = "repartition-topic"
val grouped: Grouped[String, Long] = Grouped.`with`(repartitionTopicName)
val internalGrouped = new GroupedInternal[String, Long](grouped)
internalGrouped.keySerde.getClass shouldBe Serdes.stringSerde.getClass
internalGrouped.valueSerde.getClass shouldBe Serdes.longSerde.getClass
internalGrouped.name() shouldBe repartitionTopicName
assertEquals(Serdes.stringSerde.getClass, internalGrouped.keySerde.getClass)
assertEquals(Serdes.longSerde.getClass, internalGrouped.valueSerde.getClass)
assertEquals(repartitionTopicName, internalGrouped.name())
}
}

View File

@ -20,27 +20,27 @@ package org.apache.kafka.streams.scala.kstream
import org.apache.kafka.streams.scala.serialization.Serdes
import org.apache.kafka.streams.scala.serialization.Serdes._
import org.junit.runner.RunWith
import org.scalatest.{FlatSpec, Matchers}
import org.scalatestplus.junit.JUnitRunner
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
@RunWith(classOf[JUnitRunner])
class JoinedTest extends FlatSpec with Matchers {
class JoinedTest {
"Create a Joined" should "create a Joined with Serdes" in {
@Test
def testCreateJoined(): Unit = {
val joined: Joined[String, Long, Int] = Joined.`with`[String, Long, Int]
joined.keySerde.getClass shouldBe Serdes.stringSerde.getClass
joined.valueSerde.getClass shouldBe Serdes.longSerde.getClass
joined.otherValueSerde.getClass shouldBe Serdes.intSerde.getClass
assertEquals(joined.keySerde.getClass, Serdes.stringSerde.getClass)
assertEquals(joined.valueSerde.getClass, Serdes.longSerde.getClass)
assertEquals(joined.otherValueSerde.getClass, Serdes.intSerde.getClass)
}
"Create a Joined" should "create a Joined with Serdes and repartition topic name" in {
@Test
def testCreateJoinedWithSerdesAndRepartitionTopicName(): Unit = {
val repartitionTopicName = "repartition-topic"
val joined: Joined[String, Long, Int] = Joined.`with`(repartitionTopicName)
joined.keySerde.getClass shouldBe Serdes.stringSerde.getClass
joined.valueSerde.getClass shouldBe Serdes.longSerde.getClass
joined.otherValueSerde.getClass shouldBe Serdes.intSerde.getClass
assertEquals(joined.keySerde.getClass, Serdes.stringSerde.getClass)
assertEquals(joined.valueSerde.getClass, Serdes.longSerde.getClass)
assertEquals(joined.otherValueSerde.getClass, Serdes.intSerde.getClass)
}
}

View File

@ -23,15 +23,15 @@ import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.StreamsBuilder
import org.apache.kafka.streams.scala.serialization.Serdes._
import org.apache.kafka.streams.scala.utils.TestDriver
import org.junit.runner.RunWith
import org.scalatest.{FlatSpec, Matchers}
import org.scalatestplus.junit.JUnitRunner
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import scala.jdk.CollectionConverters._
@RunWith(classOf[JUnitRunner])
class KStreamSplitTest extends FlatSpec with Matchers with TestDriver {
"split" should "route messages according to predicates" in {
class KStreamSplitTest extends TestDriver {
@Test
def testRouteMessagesAccordingToPredicates(): Unit = {
val builder = new StreamsBuilder()
val sourceTopic = "source"
val sinkTopic = Array("default", "even", "three");
@ -56,15 +56,15 @@ class KStreamSplitTest extends FlatSpec with Matchers with TestDriver {
.map(Integer.valueOf)
.asJava
)
testOutput(0).readValuesToList().asScala shouldBe List(1, 5)
testOutput(1).readValuesToList().asScala shouldBe List(2, 4)
testOutput(2).readValuesToList().asScala shouldBe List(3)
assertEquals(List(1, 5), testOutput(0).readValuesToList().asScala)
assertEquals(List(2, 4), testOutput(1).readValuesToList().asScala)
assertEquals(List(3), testOutput(2).readValuesToList().asScala)
testDriver.close()
}
"split" should "route messages to consumers" in {
@Test
def testRouteMessagesToConsumers(): Unit = {
val builder = new StreamsBuilder()
val sourceTopic = "source"
@ -88,13 +88,14 @@ class KStreamSplitTest extends FlatSpec with Matchers with TestDriver {
val even = testDriver.createOutput[Integer, Integer]("even")
val mapped = testDriver.createOutput[Integer, Integer]("mapped")
even.readValuesToList().asScala shouldBe List(2, 4)
mapped.readValuesToList().asScala shouldBe List(9, 81)
assertEquals(List(2, 4), even.readValuesToList().asScala)
assertEquals(List(9, 81), mapped.readValuesToList().asScala)
testDriver.close()
}
"split" should "route messages to anonymous consumers" in {
@Test
def testRouteMessagesToAnonymousConsumers(): Unit = {
val builder = new StreamsBuilder()
val sourceTopic = "source"
@ -118,8 +119,8 @@ class KStreamSplitTest extends FlatSpec with Matchers with TestDriver {
val even = testDriver.createOutput[Integer, Integer]("even")
val mapped = testDriver.createOutput[Integer, Integer]("mapped")
even.readValuesToList().asScala shouldBe List(2, 4)
mapped.readValuesToList().asScala shouldBe List(9, 81)
assertEquals(List(2, 4), even.readValuesToList().asScala)
assertEquals(List(9, 81), mapped.readValuesToList().asScala)
testDriver.close()
}

View File

@ -36,14 +36,15 @@ import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.serialization.Serdes._
import org.apache.kafka.streams.scala.StreamsBuilder
import org.apache.kafka.streams.scala.utils.TestDriver
import org.junit.runner.RunWith
import org.scalatest.{FlatSpec, Matchers}
import org.scalatestplus.junit.JUnitRunner
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.Test
@RunWith(classOf[JUnitRunner])
class KStreamTest extends FlatSpec with Matchers with TestDriver {
import scala.jdk.CollectionConverters._
"filter a KStream" should "filter records satisfying the predicate" in {
class KStreamTest extends TestDriver {
@Test
def testFilterRecordsSatisfyingPredicate(): Unit = {
val builder = new StreamsBuilder()
val sourceTopic = "source"
val sinkTopic = "sink"
@ -55,20 +56,21 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver {
val testOutput = testDriver.createOutput[String, String](sinkTopic)
testInput.pipeInput("1", "value1")
testOutput.readValue shouldBe "value1"
assertEquals("value1", testOutput.readValue)
testInput.pipeInput("2", "value2")
testOutput.isEmpty shouldBe true
assertTrue(testOutput.isEmpty)
testInput.pipeInput("3", "value3")
testOutput.readValue shouldBe "value3"
assertEquals("value3", testOutput.readValue)
testOutput.isEmpty shouldBe true
assertTrue(testOutput.isEmpty)
testDriver.close()
}
"filterNot a KStream" should "filter records not satisfying the predicate" in {
@Test
def testFilterRecordsNotSatisfyingPredicate(): Unit = {
val builder = new StreamsBuilder()
val sourceTopic = "source"
val sinkTopic = "sink"
@ -80,20 +82,21 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver {
val testOutput = testDriver.createOutput[String, String](sinkTopic)
testInput.pipeInput("1", "value1")
testOutput.readValue shouldBe "value1"
assertEquals("value1", testOutput.readValue)
testInput.pipeInput("2", "value2")
testOutput.isEmpty shouldBe true
assertTrue(testOutput.isEmpty)
testInput.pipeInput("3", "value3")
testOutput.readValue shouldBe "value3"
assertEquals("value3", testOutput.readValue)
testOutput.isEmpty shouldBe true
assertTrue(testOutput.isEmpty)
testDriver.close()
}
"foreach a KStream" should "run foreach actions on records" in {
@Test
def testForeachActionsOnRecords(): Unit = {
val builder = new StreamsBuilder()
val sourceTopic = "source"
@ -104,15 +107,16 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver {
val testInput = testDriver.createInput[String, String](sourceTopic)
testInput.pipeInput("1", "value1")
acc shouldBe "value1"
assertEquals("value1", acc)
testInput.pipeInput("2", "value2")
acc shouldBe "value1value2"
assertEquals("value1value2", acc)
testDriver.close()
}
"peek a KStream" should "run peek actions on records" in {
@Test
def testPeekActionsOnRecords(): Unit = {
val builder = new StreamsBuilder()
val sourceTopic = "source"
val sinkTopic = "sink"
@ -125,17 +129,18 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver {
val testOutput = testDriver.createOutput[String, String](sinkTopic)
testInput.pipeInput("1", "value1")
acc shouldBe "value1"
testOutput.readValue shouldBe "value1"
assertEquals("value1", acc)
assertEquals("value1", testOutput.readValue)
testInput.pipeInput("2", "value2")
acc shouldBe "value1value2"
testOutput.readValue shouldBe "value2"
assertEquals("value1value2", acc)
assertEquals("value2", testOutput.readValue)
testDriver.close()
}
"selectKey a KStream" should "select a new key" in {
@Test
def testSelectNewKey(): Unit = {
val builder = new StreamsBuilder()
val sourceTopic = "source"
val sinkTopic = "sink"
@ -147,17 +152,18 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver {
val testOutput = testDriver.createOutput[String, String](sinkTopic)
testInput.pipeInput("1", "value1")
testOutput.readKeyValue.key shouldBe "value1"
assertEquals("value1", testOutput.readKeyValue.key)
testInput.pipeInput("1", "value2")
testOutput.readKeyValue.key shouldBe "value2"
assertEquals("value2", testOutput.readKeyValue.key)
testOutput.isEmpty shouldBe true
assertTrue(testOutput.isEmpty)
testDriver.close()
}
"repartition" should "repartition a KStream" in {
@Test
def testRepartitionKStream(): Unit = {
val builder = new StreamsBuilder()
val sourceTopic = "source"
val repartitionName = "repartition"
@ -171,15 +177,15 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver {
testInput.pipeInput("1", "value1")
val kv1 = testOutput.readKeyValue
kv1.key shouldBe "1"
kv1.value shouldBe "value1"
assertEquals("1", kv1.key)
assertEquals("value1", kv1.value)
testInput.pipeInput("2", "value2")
val kv2 = testOutput.readKeyValue
kv2.key shouldBe "2"
kv2.value shouldBe "value2"
assertEquals("2", kv2.key)
assertEquals("value2", kv2.value)
testOutput.isEmpty shouldBe true
assertTrue(testOutput.isEmpty)
// appId == "test"
testDriver.producedTopicNames() contains "test-" + repartitionName + "-repartition"
@ -187,7 +193,8 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver {
testDriver.close()
}
"join 2 KStreams" should "join correctly records" in {
@Test
def testJoinCorrectlyRecords(): Unit = {
val builder = new StreamsBuilder()
val sourceTopic1 = "source1"
val sourceTopic2 = "source2"
@ -207,14 +214,15 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver {
testInput1.pipeInput("1", "topic1value1", now)
testInput2.pipeInput("1", "topic2value1", now)
testOutput.readValue shouldBe "topic1value1-topic2value1"
assertEquals("topic1value1-topic2value1", testOutput.readValue)
testOutput.isEmpty shouldBe true
assertTrue(testOutput.isEmpty)
testDriver.close()
}
"transform a KStream" should "transform correctly records" in {
@Test
def testTransformCorrectlyRecords(): Unit = {
class TestTransformer extends Transformer[String, String, KeyValue[String, String]] {
override def init(context: ProcessorContext): Unit = {}
@ -240,15 +248,16 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver {
testInput.pipeInput("1", "value", now)
val result = testOutput.readKeyValue()
result.value shouldBe "value-transformed"
result.key shouldBe "1-transformed"
assertEquals("value-transformed", result.value)
assertEquals("1-transformed", result.key)
testOutput.isEmpty shouldBe true
assertTrue(testOutput.isEmpty)
testDriver.close()
}
"flatTransform a KStream" should "flatTransform correctly records" in {
@Test
def testFlatTransformCorrectlyRecords(): Unit = {
class TestTransformer extends Transformer[String, String, Iterable[KeyValue[String, String]]] {
override def init(context: ProcessorContext): Unit = {}
@ -274,15 +283,16 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver {
testInput.pipeInput("1", "value", now)
val result = testOutput.readKeyValue()
result.value shouldBe "value-transformed"
result.key shouldBe "1-transformed"
assertEquals("value-transformed", result.value)
assertEquals("1-transformed", result.key)
testOutput.isEmpty shouldBe true
assertTrue(testOutput.isEmpty)
testDriver.close()
}
"flatTransformValues a KStream" should "correctly flatTransform values in records" in {
@Test
def testCorrectlyFlatTransformValuesInRecords(): Unit = {
class TestTransformer extends ValueTransformer[String, Iterable[String]] {
override def init(context: ProcessorContext): Unit = {}
@ -310,14 +320,15 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver {
testInput.pipeInput("1", "value", now)
testOutput.readValue shouldBe "value-transformed"
assertEquals("value-transformed", testOutput.readValue)
testOutput.isEmpty shouldBe true
assertTrue(testOutput.isEmpty)
testDriver.close()
}
"flatTransformValues with key in a KStream" should "correctly flatTransformValues in records" in {
@Test
def testCorrectlyFlatTransformValuesInRecordsWithKey(): Unit = {
class TestTransformer extends ValueTransformerWithKey[String, String, Iterable[String]] {
override def init(context: ProcessorContext): Unit = {}
@ -345,14 +356,15 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver {
testInput.pipeInput("1", "value", now)
testOutput.readValue shouldBe "value-transformed-1"
assertEquals("value-transformed-1", testOutput.readValue)
testOutput.isEmpty shouldBe true
assertTrue(testOutput.isEmpty)
testDriver.close()
}
"join 2 KStreamToTables" should "join correctly records" in {
@Test
def testJoinTwoKStreamToTables(): Unit = {
val builder = new StreamsBuilder()
val sourceTopic1 = "source1"
val sourceTopic2 = "source2"
@ -370,14 +382,15 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver {
testInput1.pipeInput("1", "topic1value1")
testInput2.pipeInput("1", "topic2value1")
testOutput.readValue shouldBe "topic1value1topic2value1"
assertEquals("topic1value1topic2value1", testOutput.readValue)
testOutput.isEmpty shouldBe true
assertTrue(testOutput.isEmpty)
testDriver.close()
}
"setting a name on a filter" should "pass the name to the topology" in {
@Test
def testSettingNameOnFilter(): Unit = {
val builder = new StreamsBuilder()
val sourceTopic = "source"
val sinkTopic = "sink"
@ -390,10 +403,11 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver {
import scala.jdk.CollectionConverters._
val filterNode = builder.build().describe().subtopologies().asScala.head.nodes().asScala.toList(1)
filterNode.name() shouldBe "my-name"
assertEquals("my-name", filterNode.name())
}
"setting a name on output table" should "pass the name to the topology" in {
@Test
def testSettingNameOnOutputTable(): Unit = {
val builder = new StreamsBuilder()
val sourceTopic1 = "source1"
val sinkTopic = "sink"
@ -407,10 +421,11 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver {
import scala.jdk.CollectionConverters._
val tableNode = builder.build().describe().subtopologies().asScala.head.nodes().asScala.toList(1)
tableNode.name() shouldBe "my-name"
assertEquals("my-name", tableNode.name())
}
"setting a name on a join" should "pass the name to the topology" in {
@Test
def testSettingNameOnJoin(): Unit = {
val builder = new StreamsBuilder()
val sourceTopic1 = "source"
val sourceGTable = "table"
@ -425,10 +440,11 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver {
import scala.jdk.CollectionConverters._
val joinNode = builder.build().describe().subtopologies().asScala.head.nodes().asScala.toList(1)
joinNode.name() shouldBe "my-name"
assertEquals("my-name", joinNode.name())
}
"setting a name on a transform" should "pass the name to the topology" in {
@Test
def testSettingNameOnTransform(): Unit = {
class TestTransformer extends Transformer[String, String, KeyValue[String, String]] {
override def init(context: ProcessorContext): Unit = {}
@ -446,9 +462,7 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver {
.transform(() => new TestTransformer, Named.as("my-name"))
.to(sinkTopic)
import scala.jdk.CollectionConverters._
val transformNode = builder.build().describe().subtopologies().asScala.head.nodes().asScala.toList(1)
transformNode.name() shouldBe "my-name"
assertEquals("my-name", transformNode.name())
}
}

View File

@ -18,21 +18,22 @@
*/
package org.apache.kafka.streams.scala.kstream
import java.time.Duration
import org.apache.kafka.streams.kstream.{Named, SessionWindows, TimeWindows, Windowed, Suppressed => JSuppressed}
import org.apache.kafka.streams.kstream.Suppressed.BufferConfig
import org.apache.kafka.streams.kstream.{Named, SessionWindows, TimeWindows, Windowed, Suppressed => JSuppressed}
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.serialization.Serdes._
import org.apache.kafka.streams.scala.utils.TestDriver
import org.apache.kafka.streams.scala.{ByteArrayKeyValueStore, StreamsBuilder}
import org.junit.runner.RunWith
import org.scalatest.{FlatSpec, Matchers}
import org.scalatestplus.junit.JUnitRunner
import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertTrue}
import org.junit.jupiter.api.Test
@RunWith(classOf[JUnitRunner])
class KTableTest extends FlatSpec with Matchers with TestDriver {
import java.time.Duration
import scala.jdk.CollectionConverters._
"filter a KTable" should "filter records satisfying the predicate" in {
class KTableTest extends TestDriver {
@Test
def testFilterRecordsSatisfyingPredicate(): Unit = {
val builder = new StreamsBuilder()
val sourceTopic = "source"
val sinkTopic = "sink"
@ -47,25 +48,26 @@ class KTableTest extends FlatSpec with Matchers with TestDriver {
{
testInput.pipeInput("a", "passes filter : add new row to table")
val record = testOutput.readKeyValue
record.key shouldBe "a"
record.value shouldBe 1
assertEquals("a", record.key)
assertEquals(1, record.value)
}
{
testInput.pipeInput("a", "fails filter : remove existing row from table")
val record = testOutput.readKeyValue
record.key shouldBe "a"
record.value shouldBe (null: java.lang.Long)
assertEquals("a", record.key)
assertNull(record.value)
}
{
testInput.pipeInput("b", "fails filter : no output")
testOutput.isEmpty shouldBe true
assertTrue(testOutput.isEmpty)
}
testOutput.isEmpty shouldBe true
assertTrue(testOutput.isEmpty)
testDriver.close()
}
"filterNot a KTable" should "filter records not satisfying the predicate" in {
@Test
def testFilterRecordsNotSatisfyingPredicate(): Unit = {
val builder = new StreamsBuilder()
val sourceTopic = "source"
val sinkTopic = "sink"
@ -80,27 +82,28 @@ class KTableTest extends FlatSpec with Matchers with TestDriver {
{
testInput.pipeInput("1", "value1")
val record = testOutput.readKeyValue
record.key shouldBe "1"
record.value shouldBe 1
assertEquals("1", record.key)
assertEquals(1, record.value)
}
{
testInput.pipeInput("1", "value2")
val record = testOutput.readKeyValue
record.key shouldBe "1"
record.value shouldBe (null: java.lang.Long)
assertEquals("1", record.key)
assertNull(record.value)
}
{
testInput.pipeInput("2", "value1")
val record = testOutput.readKeyValue
record.key shouldBe "2"
record.value shouldBe 1
assertEquals("2", record.key)
assertEquals(1, record.value)
}
testOutput.isEmpty shouldBe true
assertTrue(testOutput.isEmpty)
testDriver.close()
}
"join 2 KTables" should "join correctly records" in {
@Test
def testJoinCorrectlyRecords(): Unit = {
val builder = new StreamsBuilder()
val sourceTopic1 = "source1"
val sourceTopic2 = "source2"
@ -117,14 +120,15 @@ class KTableTest extends FlatSpec with Matchers with TestDriver {
testInput1.pipeInput("1", "topic1value1")
testInput2.pipeInput("1", "topic2value1")
testOutput.readValue shouldBe 2
assertEquals(2, testOutput.readValue)
testOutput.isEmpty shouldBe true
assertTrue(testOutput.isEmpty)
testDriver.close()
}
"join 2 KTables with a Materialized" should "join correctly records and state store" in {
@Test
def testJoinCorrectlyRecordsAndStateStore(): Unit = {
val builder = new StreamsBuilder()
val sourceTopic1 = "source1"
val sourceTopic2 = "source2"
@ -143,15 +147,16 @@ class KTableTest extends FlatSpec with Matchers with TestDriver {
testInput1.pipeInput("1", "topic1value1")
testInput2.pipeInput("1", "topic2value1")
testOutput.readValue shouldBe 2
testDriver.getKeyValueStore[String, Long](stateStore).get("1") shouldBe 2
assertEquals(2, testOutput.readValue)
assertEquals(2, testDriver.getKeyValueStore[String, Long](stateStore).get("1"))
testOutput.isEmpty shouldBe true
assertTrue(testOutput.isEmpty)
testDriver.close()
}
"windowed KTable#suppress" should "correctly suppress results using Suppressed.untilTimeLimit" in {
@Test
def testCorrectlySuppressResultsUsingSuppressedUntilTimeLimit(): Unit = {
val builder = new StreamsBuilder()
val sourceTopic = "source"
val sinkTopic = "sink"
@ -174,41 +179,42 @@ class KTableTest extends FlatSpec with Matchers with TestDriver {
{
// publish key=1 @ time 0 => count==1
testInput.pipeInput("1", "value1", 0L)
testOutput.isEmpty shouldBe true
assertTrue(testOutput.isEmpty)
}
{
// publish key=1 @ time 1 => count==2
testInput.pipeInput("1", "value2", 1L)
testOutput.isEmpty shouldBe true
assertTrue(testOutput.isEmpty)
}
{
// move event time past the first window, but before the suppression window
testInput.pipeInput("2", "value1", 1001L)
testOutput.isEmpty shouldBe true
assertTrue(testOutput.isEmpty)
}
{
// move event time riiiight before suppression window ends
testInput.pipeInput("2", "value2", 1999L)
testOutput.isEmpty shouldBe true
assertTrue(testOutput.isEmpty)
}
{
// publish a late event before suppression window terminates => count==3
testInput.pipeInput("1", "value3", 999L)
testOutput.isEmpty shouldBe true
assertTrue(testOutput.isEmpty)
}
{
// move event time right past the suppression window of the first window.
testInput.pipeInput("2", "value3", 2001L)
val record = testOutput.readKeyValue
record.key shouldBe "0:1000:1"
record.value shouldBe 3L
assertEquals("0:1000:1", record.key)
assertEquals(3L, record.value)
}
testOutput.isEmpty shouldBe true
assertTrue(testOutput.isEmpty)
testDriver.close()
}
"windowed KTable#suppress" should "correctly suppress results using Suppressed.untilWindowCloses" in {
@Test
def testCorrectlySuppressResultsUsingSuppressedUntilWindowClosesByWindowed(): Unit = {
val builder = new StreamsBuilder()
val sourceTopic = "source"
val sinkTopic = "sink"
@ -231,41 +237,42 @@ class KTableTest extends FlatSpec with Matchers with TestDriver {
{
// publish key=1 @ time 0 => count==1
testInput.pipeInput("1", "value1", 0L)
testOutput.isEmpty shouldBe true
assertTrue(testOutput.isEmpty)
}
{
// publish key=1 @ time 1 => count==2
testInput.pipeInput("1", "value2", 1L)
testOutput.isEmpty shouldBe true
assertTrue(testOutput.isEmpty)
}
{
// move event time past the window, but before the grace period
testInput.pipeInput("2", "value1", 1001L)
testOutput.isEmpty shouldBe true
assertTrue(testOutput.isEmpty)
}
{
// move event time riiiight before grace period ends
testInput.pipeInput("2", "value2", 1999L)
testOutput.isEmpty shouldBe true
assertTrue(testOutput.isEmpty)
}
{
// publish a late event before grace period terminates => count==3
testInput.pipeInput("1", "value3", 999L)
testOutput.isEmpty shouldBe true
assertTrue(testOutput.isEmpty)
}
{
// move event time right past the grace period of the first window.
testInput.pipeInput("2", "value3", 2001L)
val record = testOutput.readKeyValue
record.key shouldBe "0:1000:1"
record.value shouldBe 3L
assertEquals("0:1000:1", record.key)
assertEquals(3L, record.value)
}
testOutput.isEmpty shouldBe true
assertTrue(testOutput.isEmpty)
testDriver.close()
}
"session windowed KTable#suppress" should "correctly suppress results using Suppressed.untilWindowCloses" in {
@Test
def testCorrectlySuppressResultsUsingSuppressedUntilWindowClosesBySession(): Unit = {
val builder = new StreamsBuilder()
val sourceTopic = "source"
val sinkTopic = "sink"
@ -289,32 +296,32 @@ class KTableTest extends FlatSpec with Matchers with TestDriver {
{
// first window
testInput.pipeInput("k1", "v1", 0L)
testOutput.isEmpty shouldBe true
assertTrue(testOutput.isEmpty)
}
{
// first window
testInput.pipeInput("k1", "v1", 1L)
testOutput.isEmpty shouldBe true
assertTrue(testOutput.isEmpty)
}
{
// new window, but grace period hasn't ended for first window
testInput.pipeInput("k1", "v1", 8L)
testOutput.isEmpty shouldBe true
assertTrue(testOutput.isEmpty)
}
{
// out-of-order event for first window, included since grade period hasn't passed
testInput.pipeInput("k1", "v1", 2L)
testOutput.isEmpty shouldBe true
assertTrue(testOutput.isEmpty)
}
{
// add to second window
testInput.pipeInput("k1", "v1", 13L)
testOutput.isEmpty shouldBe true
assertTrue(testOutput.isEmpty)
}
{
// add out-of-order to second window
testInput.pipeInput("k1", "v1", 10L)
testOutput.isEmpty shouldBe true
assertTrue(testOutput.isEmpty)
}
{
// push stream time forward to flush other events through
@ -323,20 +330,21 @@ class KTableTest extends FlatSpec with Matchers with TestDriver {
testInput.pipeInput("k1", "v1", 3L)
// should now have to results
val r1 = testOutput.readRecord
r1.key shouldBe "0:2:k1"
r1.value shouldBe 3L
r1.timestamp shouldBe 2L
assertEquals("0:2:k1", r1.key)
assertEquals(3L, r1.value)
assertEquals(2L, r1.timestamp)
val r2 = testOutput.readRecord
r2.key shouldBe "8:13:k1"
r2.value shouldBe 3L
r2.timestamp shouldBe 13L
assertEquals("8:13:k1", r2.key)
assertEquals(3L, r2.value)
assertEquals(13L, r2.timestamp)
}
testOutput.isEmpty shouldBe true
assertTrue(testOutput.isEmpty)
testDriver.close()
}
"non-windowed KTable#suppress" should "correctly suppress results using Suppressed.untilTimeLimit" in {
@Test
def testCorrectlySuppressResultsUsingSuppressedUntilTimeLimtByNonWindowed(): Unit = {
val builder = new StreamsBuilder()
val sourceTopic = "source"
val sinkTopic = "sink"
@ -357,41 +365,42 @@ class KTableTest extends FlatSpec with Matchers with TestDriver {
{
// publish key=1 @ time 0 => count==1
testInput.pipeInput("1", "value1", 0L)
testOutput.isEmpty shouldBe true
assertTrue(testOutput.isEmpty)
}
{
// publish key=1 @ time 1 => count==2
testInput.pipeInput("1", "value2", 1L)
testOutput.isEmpty shouldBe true
assertTrue(testOutput.isEmpty)
}
{
// move event time past the window, but before the grace period
testInput.pipeInput("2", "value1", 1001L)
testOutput.isEmpty shouldBe true
assertTrue(testOutput.isEmpty)
}
{
// move event time right before grace period ends
testInput.pipeInput("2", "value2", 1999L)
testOutput.isEmpty shouldBe true
assertTrue(testOutput.isEmpty)
}
{
// publish a late event before grace period terminates => count==3
testInput.pipeInput("1", "value3", 999L)
testOutput.isEmpty shouldBe true
assertTrue(testOutput.isEmpty)
}
{
// move event time right past the grace period of the first window.
testInput.pipeInput("2", "value3", 2001L)
val record = testOutput.readKeyValue
record.key shouldBe "1"
record.value shouldBe 3L
assertEquals("1", record.key)
assertEquals(3L, record.value)
}
testOutput.isEmpty shouldBe true
assertTrue(testOutput.isEmpty)
testDriver.close()
}
"setting a name on a filter processor" should "pass the name to the topology" in {
@Test
def testSettingNameOnFilterProcessor(): Unit = {
val builder = new StreamsBuilder()
val sourceTopic = "source"
val sinkTopic = "sink"
@ -405,10 +414,11 @@ class KTableTest extends FlatSpec with Matchers with TestDriver {
import scala.jdk.CollectionConverters._
val filterNode = builder.build().describe().subtopologies().asScala.toList(1).nodes().asScala.toList(3)
filterNode.name() shouldBe "my-name"
assertEquals("my-name", filterNode.name())
}
"setting a name on a count processor" should "pass the name to the topology" in {
@Test
def testSettingNameOnCountProcessor(): Unit = {
val builder = new StreamsBuilder()
val sourceTopic = "source"
val sinkTopic = "sink"
@ -419,10 +429,11 @@ class KTableTest extends FlatSpec with Matchers with TestDriver {
import scala.jdk.CollectionConverters._
val countNode = builder.build().describe().subtopologies().asScala.toList(1).nodes().asScala.toList(1)
countNode.name() shouldBe "my-name"
assertEquals("my-name", countNode.name())
}
"setting a name on a join processor" should "pass the name to the topology" in {
@Test
def testSettingNameOnJoinProcessor(): Unit = {
val builder = new StreamsBuilder()
val sourceTopic1 = "source1"
val sourceTopic2 = "source2"
@ -435,12 +446,9 @@ class KTableTest extends FlatSpec with Matchers with TestDriver {
.toStream
.to(sinkTopic)
import scala.jdk.CollectionConverters._
val joinNodeLeft = builder.build().describe().subtopologies().asScala.toList(1).nodes().asScala.toList(6)
val joinNodeRight = builder.build().describe().subtopologies().asScala.toList(1).nodes().asScala.toList(7)
joinNodeLeft.name() should include("my-name")
joinNodeRight.name() should include("my-name")
assertTrue(joinNodeLeft.name().contains("my-name"))
assertTrue(joinNodeRight.name().contains("my-name"))
}
}

View File

@ -18,70 +18,73 @@
*/
package org.apache.kafka.streams.scala.kstream
import org.apache.kafka.streams.kstream.internals.MaterializedInternal
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.serialization.Serdes
import org.apache.kafka.streams.scala.serialization.Serdes._
import org.apache.kafka.streams.state.Stores
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import java.time.Duration
import org.apache.kafka.streams.kstream.internals.MaterializedInternal
import org.apache.kafka.streams.scala.serialization.Serdes._
import org.apache.kafka.streams.scala.serialization.Serdes
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.state.Stores
import org.junit.runner.RunWith
import org.scalatest.{FlatSpec, Matchers}
import org.scalatestplus.junit.JUnitRunner
class MaterializedTest {
@RunWith(classOf[JUnitRunner])
class MaterializedTest extends FlatSpec with Matchers {
"Create a Materialized" should "create a Materialized with Serdes" in {
@Test
def testCreateMaterializedWithSerdes(): Unit = {
val materialized: Materialized[String, Long, ByteArrayKeyValueStore] =
Materialized.`with`[String, Long, ByteArrayKeyValueStore]
val internalMaterialized = new MaterializedInternal(materialized)
internalMaterialized.keySerde.getClass shouldBe Serdes.stringSerde.getClass
internalMaterialized.valueSerde.getClass shouldBe Serdes.longSerde.getClass
assertEquals(Serdes.stringSerde.getClass, internalMaterialized.keySerde.getClass)
assertEquals(Serdes.longSerde.getClass, internalMaterialized.valueSerde.getClass)
}
"Create a Materialize with a store name" should "create a Materialized with Serdes and a store name" in {
@Test
def testCreateMaterializedWithSerdesAndStoreName(): Unit = {
val storeName = "store"
val materialized: Materialized[String, Long, ByteArrayKeyValueStore] =
Materialized.as[String, Long, ByteArrayKeyValueStore](storeName)
val internalMaterialized = new MaterializedInternal(materialized)
internalMaterialized.keySerde.getClass shouldBe Serdes.stringSerde.getClass
internalMaterialized.valueSerde.getClass shouldBe Serdes.longSerde.getClass
internalMaterialized.storeName shouldBe storeName
assertEquals(Serdes.stringSerde.getClass, internalMaterialized.keySerde.getClass)
assertEquals(Serdes.longSerde.getClass, internalMaterialized.valueSerde.getClass)
assertEquals(storeName, internalMaterialized.storeName)
}
"Create a Materialize with a window store supplier" should "create a Materialized with Serdes and a store supplier" in {
@Test
def testCreateMaterializedWithSerdesAndWindowStoreSupplier(): Unit = {
val storeSupplier = Stores.persistentWindowStore("store", Duration.ofMillis(1), Duration.ofMillis(1), true)
val materialized: Materialized[String, Long, ByteArrayWindowStore] =
Materialized.as[String, Long](storeSupplier)
val internalMaterialized = new MaterializedInternal(materialized)
internalMaterialized.keySerde.getClass shouldBe Serdes.stringSerde.getClass
internalMaterialized.valueSerde.getClass shouldBe Serdes.longSerde.getClass
internalMaterialized.storeSupplier shouldBe storeSupplier
assertEquals(Serdes.stringSerde.getClass, internalMaterialized.keySerde.getClass)
assertEquals(Serdes.longSerde.getClass, internalMaterialized.valueSerde.getClass)
assertEquals(storeSupplier, internalMaterialized.storeSupplier)
}
"Create a Materialize with a key value store supplier" should "create a Materialized with Serdes and a store supplier" in {
@Test
def testCreateMaterializedWithSerdesAndKeyValueStoreSupplier(): Unit = {
val storeSupplier = Stores.persistentKeyValueStore("store")
val materialized: Materialized[String, Long, ByteArrayKeyValueStore] =
Materialized.as[String, Long](storeSupplier)
val internalMaterialized = new MaterializedInternal(materialized)
internalMaterialized.keySerde.getClass shouldBe Serdes.stringSerde.getClass
internalMaterialized.valueSerde.getClass shouldBe Serdes.longSerde.getClass
internalMaterialized.storeSupplier shouldBe storeSupplier
assertEquals(Serdes.stringSerde.getClass, internalMaterialized.keySerde.getClass)
assertEquals(Serdes.longSerde.getClass, internalMaterialized.valueSerde.getClass)
assertEquals(storeSupplier, internalMaterialized.storeSupplier)
}
"Create a Materialize with a session store supplier" should "create a Materialized with Serdes and a store supplier" in {
@Test
def testCreateMaterializedWithSerdesAndSessionStoreSupplier(): Unit = {
val storeSupplier = Stores.persistentSessionStore("store", Duration.ofMillis(1))
val materialized: Materialized[String, Long, ByteArraySessionStore] =
Materialized.as[String, Long](storeSupplier)
val internalMaterialized = new MaterializedInternal(materialized)
internalMaterialized.keySerde.getClass shouldBe Serdes.stringSerde.getClass
internalMaterialized.valueSerde.getClass shouldBe Serdes.longSerde.getClass
internalMaterialized.storeSupplier shouldBe storeSupplier
assertEquals(Serdes.stringSerde.getClass, internalMaterialized.keySerde.getClass)
assertEquals(Serdes.longSerde.getClass, internalMaterialized.valueSerde.getClass)
assertEquals(storeSupplier, internalMaterialized.storeSupplier)
}
}

View File

@ -20,32 +20,32 @@ package org.apache.kafka.streams.scala.kstream
import org.apache.kafka.streams.kstream.internals.ProducedInternal
import org.apache.kafka.streams.processor.StreamPartitioner
import org.apache.kafka.streams.scala.serialization.Serdes._
import org.apache.kafka.streams.scala.serialization.Serdes
import org.junit.runner.RunWith
import org.scalatest.{FlatSpec, Matchers}
import org.scalatestplus.junit.JUnitRunner
import org.apache.kafka.streams.scala.serialization.Serdes._
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
@RunWith(classOf[JUnitRunner])
class ProducedTest extends FlatSpec with Matchers {
class ProducedTest {
"Create a Produced" should "create a Produced with Serdes" in {
@Test
def testCreateProducedWithSerdes(): Unit = {
val produced: Produced[String, Long] = Produced.`with`[String, Long]
val internalProduced = new ProducedInternal(produced)
internalProduced.keySerde.getClass shouldBe Serdes.stringSerde.getClass
internalProduced.valueSerde.getClass shouldBe Serdes.longSerde.getClass
assertEquals(Serdes.stringSerde.getClass, internalProduced.keySerde.getClass)
assertEquals(Serdes.longSerde.getClass, internalProduced.valueSerde.getClass)
}
"Create a Produced with streamPartitioner" should "create a Produced with Serdes and streamPartitioner" in {
@Test
def testCreateProducedWithSerdesAndStreamPartitioner(): Unit = {
val partitioner = new StreamPartitioner[String, Long] {
override def partition(topic: String, key: String, value: Long, numPartitions: Int): Integer = 0
}
val produced: Produced[String, Long] = Produced.`with`(partitioner)
val internalProduced = new ProducedInternal(produced)
internalProduced.keySerde.getClass shouldBe Serdes.stringSerde.getClass
internalProduced.valueSerde.getClass shouldBe Serdes.longSerde.getClass
internalProduced.streamPartitioner shouldBe partitioner
assertEquals(Serdes.stringSerde.getClass, internalProduced.keySerde.getClass)
assertEquals(Serdes.longSerde.getClass, internalProduced.valueSerde.getClass)
assertEquals(partitioner, internalProduced.streamPartitioner)
}
}

View File

@ -20,55 +20,58 @@ package org.apache.kafka.streams.scala.kstream
import org.apache.kafka.streams.kstream.internals.RepartitionedInternal
import org.apache.kafka.streams.processor.StreamPartitioner
import org.apache.kafka.streams.scala.serialization.Serdes._
import org.apache.kafka.streams.scala.serialization.Serdes
import org.junit.runner.RunWith
import org.scalatest.{FlatSpec, Matchers}
import org.scalatestplus.junit.JUnitRunner
import org.apache.kafka.streams.scala.serialization.Serdes._
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
@RunWith(classOf[JUnitRunner])
class RepartitionedTest extends FlatSpec with Matchers {
class RepartitionedTest {
"Create a Repartitioned" should "create a Repartitioned with Serdes" in {
@Test
def testCreateRepartitionedWithSerdes(): Unit = {
val repartitioned: Repartitioned[String, Long] = Repartitioned.`with`[String, Long]
val internalRepartitioned = new RepartitionedInternal(repartitioned)
internalRepartitioned.keySerde.getClass shouldBe Serdes.stringSerde.getClass
internalRepartitioned.valueSerde.getClass shouldBe Serdes.longSerde.getClass
assertEquals(Serdes.stringSerde.getClass, internalRepartitioned.keySerde.getClass)
assertEquals(Serdes.longSerde.getClass, internalRepartitioned.valueSerde.getClass)
}
"Create a Repartitioned with numPartitions" should "create a Repartitioned with Serdes and numPartitions" in {
@Test
def testCreateRepartitionedWithSerdesAndNumPartitions(): Unit = {
val repartitioned: Repartitioned[String, Long] = Repartitioned.`with`[String, Long](5)
val internalRepartitioned = new RepartitionedInternal(repartitioned)
internalRepartitioned.keySerde.getClass shouldBe Serdes.stringSerde.getClass
internalRepartitioned.valueSerde.getClass shouldBe Serdes.longSerde.getClass
internalRepartitioned.numberOfPartitions shouldBe 5
assertEquals(Serdes.stringSerde.getClass, internalRepartitioned.keySerde.getClass)
assertEquals(Serdes.longSerde.getClass, internalRepartitioned.valueSerde.getClass)
assertEquals(5, internalRepartitioned.numberOfPartitions)
}
"Create a Repartitioned with topicName" should "create a Repartitioned with Serdes and topicName" in {
@Test
def testCreateRepartitionedWithSerdesAndTopicName(): Unit = {
val repartitioned: Repartitioned[String, Long] = Repartitioned.`with`[String, Long]("repartitionTopic")
val internalRepartitioned = new RepartitionedInternal(repartitioned)
internalRepartitioned.keySerde.getClass shouldBe Serdes.stringSerde.getClass
internalRepartitioned.valueSerde.getClass shouldBe Serdes.longSerde.getClass
internalRepartitioned.name shouldBe "repartitionTopic"
assertEquals(Serdes.stringSerde.getClass, internalRepartitioned.keySerde.getClass)
assertEquals(Serdes.longSerde.getClass, internalRepartitioned.valueSerde.getClass)
assertEquals("repartitionTopic", internalRepartitioned.name)
}
"Create a Repartitioned with streamPartitioner" should "create a Repartitioned with Serdes, numPartitions, topicName and streamPartitioner" in {
@Test
def testCreateRepartitionedWithSerdesAndTopicNameAndNumPartitionsAndStreamPartitioner(): Unit = {
val partitioner = new StreamPartitioner[String, Long] {
override def partition(topic: String, key: String, value: Long, numPartitions: Int): Integer = 0
}
val repartitioned: Repartitioned[String, Long] = Repartitioned.`with`[String, Long](partitioner)
val internalRepartitioned = new RepartitionedInternal(repartitioned)
internalRepartitioned.keySerde.getClass shouldBe Serdes.stringSerde.getClass
internalRepartitioned.valueSerde.getClass shouldBe Serdes.longSerde.getClass
internalRepartitioned.streamPartitioner shouldBe partitioner
assertEquals(Serdes.stringSerde.getClass, internalRepartitioned.keySerde.getClass)
assertEquals(Serdes.longSerde.getClass, internalRepartitioned.valueSerde.getClass)
assertEquals(partitioner, internalRepartitioned.streamPartitioner)
}
"Create a Repartitioned with numPartitions, topicName, and streamPartitioner" should "create a Repartitioned with Serdes, numPartitions, topicName and streamPartitioner" in {
@Test
def testCreateRepartitionedWithTopicNameAndNumPartitionsAndStreamPartitioner(): Unit = {
val partitioner = new StreamPartitioner[String, Long] {
override def partition(topic: String, key: String, value: Long, numPartitions: Int): Integer = 0
}
@ -79,11 +82,11 @@ class RepartitionedTest extends FlatSpec with Matchers {
.withStreamPartitioner(partitioner)
val internalRepartitioned = new RepartitionedInternal(repartitioned)
internalRepartitioned.keySerde.getClass shouldBe Serdes.stringSerde.getClass
internalRepartitioned.valueSerde.getClass shouldBe Serdes.longSerde.getClass
internalRepartitioned.numberOfPartitions shouldBe 5
internalRepartitioned.name shouldBe "repartitionTopic"
internalRepartitioned.streamPartitioner shouldBe partitioner
assertEquals(Serdes.stringSerde.getClass, internalRepartitioned.keySerde.getClass)
assertEquals(Serdes.longSerde.getClass, internalRepartitioned.valueSerde.getClass)
assertEquals(5, internalRepartitioned.numberOfPartitions)
assertEquals("repartitionTopic", internalRepartitioned.name)
assertEquals(partitioner, internalRepartitioned.streamPartitioner)
}
}

View File

@ -16,29 +16,29 @@
*/
package org.apache.kafka.streams.scala.kstream
import java.time.Duration
import org.apache.kafka.streams.kstream.internals.StreamJoinedInternal
import org.apache.kafka.streams.scala.serialization.Serdes
import org.apache.kafka.streams.scala.serialization.Serdes._
import org.apache.kafka.streams.state.Stores
import org.junit.runner.RunWith
import org.scalatest.{FlatSpec, Matchers}
import org.scalatestplus.junit.JUnitRunner
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
@RunWith(classOf[JUnitRunner])
class StreamJoinedTest extends FlatSpec with Matchers {
import java.time.Duration
"Create a StreamJoined" should "create a StreamJoined with Serdes" in {
class StreamJoinedTest {
@Test
def testCreateStreamJoinedWithSerdes(): Unit = {
val streamJoined: StreamJoined[String, String, Long] = StreamJoined.`with`[String, String, Long]
val streamJoinedInternal = new StreamJoinedInternal[String, String, Long](streamJoined)
streamJoinedInternal.keySerde().getClass shouldBe Serdes.stringSerde.getClass
streamJoinedInternal.valueSerde().getClass shouldBe Serdes.stringSerde.getClass
streamJoinedInternal.otherValueSerde().getClass shouldBe Serdes.longSerde.getClass
assertEquals(Serdes.stringSerde.getClass, streamJoinedInternal.keySerde().getClass)
assertEquals(Serdes.stringSerde.getClass, streamJoinedInternal.valueSerde().getClass)
assertEquals(Serdes.longSerde.getClass, streamJoinedInternal.otherValueSerde().getClass)
}
"Create a StreamJoined" should "create a StreamJoined with Serdes and Store Suppliers" in {
@Test
def testCreateStreamJoinedWithSerdesAndStoreSuppliers(): Unit = {
val storeSupplier = Stores.inMemoryWindowStore("myStore", Duration.ofMillis(500), Duration.ofMillis(250), false)
val otherStoreSupplier =
@ -48,21 +48,22 @@ class StreamJoinedTest extends FlatSpec with Matchers {
StreamJoined.`with`[String, String, Long](storeSupplier, otherStoreSupplier)
val streamJoinedInternal = new StreamJoinedInternal[String, String, Long](streamJoined)
streamJoinedInternal.keySerde().getClass shouldBe Serdes.stringSerde.getClass
streamJoinedInternal.valueSerde().getClass shouldBe Serdes.stringSerde.getClass
streamJoinedInternal.otherValueSerde().getClass shouldBe Serdes.longSerde.getClass
streamJoinedInternal.otherStoreSupplier().equals(otherStoreSupplier)
streamJoinedInternal.thisStoreSupplier().equals(storeSupplier)
assertEquals(Serdes.stringSerde.getClass, streamJoinedInternal.keySerde().getClass)
assertEquals(Serdes.stringSerde.getClass, streamJoinedInternal.valueSerde().getClass)
assertEquals(Serdes.longSerde.getClass, streamJoinedInternal.otherValueSerde().getClass)
assertEquals(otherStoreSupplier, streamJoinedInternal.otherStoreSupplier())
assertEquals(storeSupplier, streamJoinedInternal.thisStoreSupplier())
}
"Create a StreamJoined" should "create a StreamJoined with Serdes and a State Store name" in {
@Test
def testCreateStreamJoinedWithSerdesAndStateStoreName(): Unit = {
val streamJoined: StreamJoined[String, String, Long] = StreamJoined.as[String, String, Long]("myStoreName")
val streamJoinedInternal = new StreamJoinedInternal[String, String, Long](streamJoined)
streamJoinedInternal.keySerde().getClass shouldBe Serdes.stringSerde.getClass
streamJoinedInternal.valueSerde().getClass shouldBe Serdes.stringSerde.getClass
streamJoinedInternal.otherValueSerde().getClass shouldBe Serdes.longSerde.getClass
streamJoinedInternal.storeName().equals("myStoreName")
assertEquals(Serdes.stringSerde.getClass, streamJoinedInternal.keySerde().getClass)
assertEquals(Serdes.stringSerde.getClass, streamJoinedInternal.valueSerde().getClass)
assertEquals(Serdes.longSerde.getClass, streamJoinedInternal.otherValueSerde().getClass)
assertEquals("myStoreName", streamJoinedInternal.storeName())
}
}

View File

@ -18,54 +18,51 @@
*/
package org.apache.kafka.streams.scala.kstream
import org.apache.kafka.streams.kstream.internals.suppress._
import org.apache.kafka.streams.scala.kstream.Suppressed.BufferConfig
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import java.time.Duration
import org.apache.kafka.streams.kstream.internals.suppress.{
BufferFullStrategy,
EagerBufferConfigImpl,
FinalResultsSuppressionBuilder,
StrictBufferConfigImpl,
SuppressedInternal
}
import org.apache.kafka.streams.scala.kstream.Suppressed.BufferConfig
import org.junit.runner.RunWith
import org.scalatest.{FlatSpec, Matchers}
import org.scalatestplus.junit.JUnitRunner
@deprecated(message = "org.apache.kafka.streams.scala.kstream.Suppressed has been deprecated", since = "2.5")
@RunWith(classOf[JUnitRunner])
class SuppressedTest extends FlatSpec with Matchers {
class SuppressedTest {
"Suppressed.untilWindowCloses" should "produce the correct suppression" in {
@Test
def testProduceCorrectSuppressionUntilWindowCloses(): Unit = {
val bufferConfig = BufferConfig.unbounded()
val suppression = Suppressed.untilWindowCloses[String](bufferConfig)
suppression shouldEqual new FinalResultsSuppressionBuilder(null, bufferConfig)
suppression.withName("soup") shouldEqual new FinalResultsSuppressionBuilder("soup", bufferConfig)
assertEquals(suppression, new FinalResultsSuppressionBuilder(null, bufferConfig))
assertEquals(suppression.withName("soup"), new FinalResultsSuppressionBuilder("soup", bufferConfig))
}
"Suppressed.untilTimeLimit" should "produce the correct suppression" in {
@Test
def testProduceCorrectSuppressionUntilTimeLimit(): Unit = {
val bufferConfig = BufferConfig.unbounded()
val duration = Duration.ofMillis(1)
Suppressed.untilTimeLimit[String](duration, bufferConfig) shouldEqual
new SuppressedInternal[String](null, duration, bufferConfig, null, false)
assertEquals(Suppressed.untilTimeLimit[String](duration, bufferConfig),
new SuppressedInternal[String](null, duration, bufferConfig, null, false))
}
"BufferConfig.maxRecords" should "produce the correct buffer config" in {
BufferConfig.maxRecords(4) shouldEqual new EagerBufferConfigImpl(4, Long.MaxValue)
BufferConfig.maxRecords(4).withMaxBytes(5) shouldEqual new EagerBufferConfigImpl(4, 5)
@Test
def testProduceCorrectBufferConfigWithMaxRecords(): Unit = {
assertEquals(BufferConfig.maxRecords(4), new EagerBufferConfigImpl(4, Long.MaxValue))
assertEquals(BufferConfig.maxRecords(4).withMaxBytes(5), new EagerBufferConfigImpl(4, 5))
}
"BufferConfig.maxBytes" should "produce the correct buffer config" in {
BufferConfig.maxBytes(4) shouldEqual new EagerBufferConfigImpl(Long.MaxValue, 4)
BufferConfig.maxBytes(4).withMaxRecords(5) shouldEqual new EagerBufferConfigImpl(5, 4)
@Test
def testProduceCorrectBufferConfigWithMaxBytes(): Unit = {
assertEquals(BufferConfig.maxBytes(4), new EagerBufferConfigImpl(Long.MaxValue, 4))
assertEquals(BufferConfig.maxBytes(4).withMaxRecords(5), new EagerBufferConfigImpl(5, 4))
}
"BufferConfig.unbounded" should "produce the correct buffer config" in {
BufferConfig.unbounded() shouldEqual
new StrictBufferConfigImpl(Long.MaxValue, Long.MaxValue, BufferFullStrategy.SHUT_DOWN)
}
@Test
def testProduceCorrectBufferConfigWithUnbounded(): Unit =
assertEquals(BufferConfig.unbounded(),
new StrictBufferConfigImpl(Long.MaxValue, Long.MaxValue, BufferFullStrategy.SHUT_DOWN))
"BufferConfig" should "support very long chains of factory methods" in {
@Test
def testSupportLongChainsOfFactoryMethods(): Unit = {
val bc1 = BufferConfig
.unbounded()
.emitEarlyWhenFull()
@ -73,8 +70,8 @@ class SuppressedTest extends FlatSpec with Matchers {
.withMaxBytes(4L)
.withMaxRecords(5L)
.withMaxBytes(6L)
bc1 shouldEqual new EagerBufferConfigImpl(5L, 6L)
bc1.shutDownWhenFull() shouldEqual new StrictBufferConfigImpl(5L, 6L, BufferFullStrategy.SHUT_DOWN)
assertEquals(new EagerBufferConfigImpl(5L, 6L), bc1)
assertEquals(new StrictBufferConfigImpl(5L, 6L, BufferFullStrategy.SHUT_DOWN), bc1.shutDownWhenFull())
val bc2 = BufferConfig
.maxBytes(4)
@ -84,8 +81,8 @@ class SuppressedTest extends FlatSpec with Matchers {
.withMaxBytes(7)
.withMaxRecords(8)
bc2 shouldEqual new StrictBufferConfigImpl(8L, 7L, BufferFullStrategy.SHUT_DOWN)
bc2.withNoBound() shouldEqual BufferConfig.unbounded()
assertEquals(new StrictBufferConfigImpl(8L, 7L, BufferFullStrategy.SHUT_DOWN), bc2)
assertEquals(BufferConfig.unbounded(), bc2.withNoBound())
val bc3 = BufferConfig
.maxRecords(5L)
@ -93,6 +90,6 @@ class SuppressedTest extends FlatSpec with Matchers {
.emitEarlyWhenFull()
.withMaxRecords(11L)
bc3 shouldEqual new EagerBufferConfigImpl(11L, 10L)
assertEquals(new EagerBufferConfigImpl(11L, 10L), bc3)
}
}

View File

@ -17,38 +17,35 @@
package org.apache.kafka.streams.scala.utils
import java.util.Properties
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization._
import org.apache.kafka.common.utils.MockTime
import org.apache.kafka.common.utils.{MockTime, Utils}
import org.apache.kafka.streams._
import org.apache.kafka.streams.integration.utils.{EmbeddedKafkaCluster, IntegrationTestUtils}
import org.apache.kafka.test.{IntegrationTest, TestUtils}
import org.junit._
import org.junit.experimental.categories.Category
import org.junit.rules.TemporaryFolder
import org.apache.kafka.test.TestUtils
import org.junit.jupiter.api._
import java.io.File
/**
* Test suite base that prepares Kafka cluster for stream-table joins in Kafka Streams
* <p>
*/
@Category(Array(classOf[IntegrationTest]))
@Tag("integration")
class StreamToTableJoinScalaIntegrationTestBase extends StreamToTableJoinTestData {
private val privateCluster: EmbeddedKafkaCluster = new EmbeddedKafkaCluster(1)
private val cluster: EmbeddedKafkaCluster = new EmbeddedKafkaCluster(1)
@Rule def cluster: EmbeddedKafkaCluster = privateCluster
final val alignedTime = (System.currentTimeMillis() / 1000 + 1) * 1000
val mockTime: MockTime = cluster.time
final private val alignedTime = (System.currentTimeMillis() / 1000 + 1) * 1000
private val mockTime: MockTime = cluster.time
mockTime.setCurrentTimeMs(alignedTime)
val tFolder: TemporaryFolder = new TemporaryFolder(TestUtils.tempDirectory())
@Rule def testFolder: TemporaryFolder = tFolder
private val testFolder: File = TestUtils.tempDirectory()
@Before
@BeforeEach
def startKafkaCluster(): Unit = {
cluster.start()
cluster.createTopic(userClicksTopic)
cluster.createTopic(userRegionsTopic)
cluster.createTopic(outputTopic)
@ -57,6 +54,12 @@ class StreamToTableJoinScalaIntegrationTestBase extends StreamToTableJoinTestDat
cluster.createTopic(outputTopicJ)
}
@AfterEach
def stopKafkaCluster(): Unit = {
cluster.stop()
Utils.delete(testFolder)
}
def getStreamsConfiguration(): Properties = {
val streamsConfiguration: Properties = new Properties()
@ -64,7 +67,7 @@ class StreamToTableJoinScalaIntegrationTestBase extends StreamToTableJoinTestDat
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers())
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "1000")
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot.getPath)
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getPath)
streamsConfiguration
}

View File

@ -25,10 +25,8 @@ import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.scala.StreamsBuilder
import org.apache.kafka.streams.{StreamsConfig, TestInputTopic, TestOutputTopic, TopologyTestDriver}
import org.apache.kafka.test.TestUtils
import org.scalatest.Suite
trait TestDriver { this: Suite =>
trait TestDriver {
def createTestDriver(builder: StreamsBuilder, initialWallClockTime: Instant = Instant.now()): TopologyTestDriver = {
val config = new Properties()
config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath)