KAFKA-4594; Annotate integration tests and provide gradle build targets to run subsets of tests

This uses JUnit Categories to identify integration tests. Adds 2 new build targets:
`integrationTest` and `unitTest`.

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Eno Thereska <eno@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes #2695 from dguy/junit-categories
This commit is contained in:
Damian Guy 2017-03-21 09:55:46 +00:00 committed by Ismael Juma
parent 05690f0c85
commit fef7fca2af
21 changed files with 289 additions and 178 deletions

View File

@ -14,38 +14,42 @@ Java 7 should be used for building in order to support both Java 7 and Java 8 at
Now everything else will work.
### Building a jar and running it ###
### Build a jar and run it ###
./gradlew jar
Follow instructions in http://kafka.apache.org/documentation.html#quickstart
### Building source jar ###
### Build source jar ###
./gradlew srcJar
### Building aggregated javadoc ###
### Build aggregated javadoc ###
./gradlew aggregatedJavadoc
### Building javadoc and scaladoc ###
### Build javadoc and scaladoc ###
./gradlew javadoc
./gradlew javadocJar # builds a javadoc jar for each module
./gradlew scaladoc
./gradlew scaladocJar # builds a scaladoc jar for each module
./gradlew docsJar # builds both (if applicable) javadoc and scaladoc jars for each module
### Running unit tests ###
./gradlew test
### Forcing re-running unit tests w/o code change ###
### Run unit/integration tests ###
./gradlew test # runs both unit and integration tests
./gradlew unitTest
./gradlew integrationTest
### Force re-running tests without code change ###
./gradlew cleanTest test
./gradlew cleanTest unitTest
./gradlew cleanTest integrationTest
### Running a particular unit test ###
### Running a particular unit/integration test ###
./gradlew -Dtest.single=RequestResponseSerializationTest core:test
### Running a particular test method within a unit test ###
### Running a particular test method within a unit/integration test ###
./gradlew core:test --tests kafka.api.ProducerFailureHandlingTest.testCannotSendToInternalTopic
./gradlew clients:test --tests org.apache.kafka.clients.MetadataTest.testMetadataUpdateWaitTime
### Running a particular unit test with log4j output ###
### Running a particular unit/integration test with log4j output ###
Change the log4j setting in either `clients/src/test/resources/log4j.properties` or `core/src/test/resources/log4j.properties`
./gradlew -i -Dtest.single=RequestResponseSerializationTest core:test
@ -103,7 +107,7 @@ to avoid known issues with this configuration.
### Building the jar for all scala versions and for all projects ###
./gradlew jarAll
### Running unit tests for all scala versions and for all projects ###
### Running unit/integration tests for all scala versions and for all projects ###
./gradlew testAll
### Building a binary release gzipped tar ball for all scala versions ###
@ -136,7 +140,7 @@ Please note for this to work you should create/update `${GRADLE_USER_HOME}/gradl
### Running code quality checks ###
There are two code quality analysis tools that we regularly run, findbugs and checkstyle.
#### Checkstyle
#### Checkstyle ####
Checkstyle enforces a consistent coding style in Kafka.
You can run checkstyle using:
@ -145,7 +149,7 @@ You can run checkstyle using:
The checkstyle warnings will be found in `reports/checkstyle/reports/main.html` and `reports/checkstyle/reports/test.html` files in the
subproject build directories. They are also are printed to the console. The build will fail if Checkstyle fails.
#### Findbugs
#### Findbugs ####
Findbugs uses static analysis to look for bugs in the code.
You can run findbugs using:

View File

@ -168,6 +168,10 @@ subprojects {
}
}
def testLoggingEvents = ["passed", "skipped", "failed"]
def testShowStandardStreams = false
def testExceptionFormat = 'full'
test {
maxParallelForks = userMaxForks ?: Runtime.runtime.availableProcessors()
@ -176,13 +180,47 @@ subprojects {
jvmArgs = maxPermSizeArgs
testLogging {
events = userTestLoggingEvents ?: ["passed", "skipped", "failed"]
showStandardStreams = userShowStandardStreams ?: false
exceptionFormat = 'full'
events = userTestLoggingEvents ?: testLoggingEvents
showStandardStreams = userShowStandardStreams ?: testShowStandardStreams
exceptionFormat = testExceptionFormat
}
}
task integrationTest(type: Test, dependsOn: compileJava) {
maxParallelForks = userMaxForks ?: Runtime.runtime.availableProcessors()
minHeapSize = "256m"
maxHeapSize = "2048m"
jvmArgs = maxPermSizeArgs
testLogging {
events = userTestLoggingEvents ?: testLoggingEvents
showStandardStreams = userShowStandardStreams ?: testShowStandardStreams
exceptionFormat = testExceptionFormat
}
useJUnit {
includeCategories 'org.apache.kafka.test.IntegrationTest'
}
}
task unitTest(type: Test, dependsOn: compileJava) {
maxParallelForks = userMaxForks ?: Runtime.runtime.availableProcessors()
minHeapSize = "256m"
maxHeapSize = "2048m"
jvmArgs = maxPermSizeArgs
testLogging {
events = userTestLoggingEvents ?: testLoggingEvents
showStandardStreams = userShowStandardStreams ?: testShowStandardStreams
exceptionFormat = testExceptionFormat
}
useJUnit {
excludeCategories 'org.apache.kafka.test.IntegrationTest'
}
}
jar {
from "$rootDir/LICENSE"
from "$rootDir/NOTICE"
@ -892,6 +930,7 @@ project(':connect:api') {
testCompile libs.junit
testRuntime libs.slf4jlog4j
testCompile project(':clients').sourceSets.test.output
}
javadoc {
@ -929,6 +968,7 @@ project(':connect:transforms') {
testCompile libs.powermockEasymock
testRuntime libs.slf4jlog4j
testCompile project(':clients').sourceSets.test.output
}
javadoc {
@ -966,6 +1006,7 @@ project(':connect:json') {
testCompile libs.powermockEasymock
testRuntime libs.slf4jlog4j
testCompile project(':clients').sourceSets.test.output
}
javadoc {
@ -1016,6 +1057,7 @@ project(':connect:runtime') {
testCompile libs.powermockEasymock
testCompile project(":connect:json")
testCompile project(':clients').sourceSets.test.output
testRuntime libs.slf4jlog4j
}
@ -1068,6 +1110,7 @@ project(':connect:file') {
testCompile libs.powermockEasymock
testRuntime libs.slf4jlog4j
testCompile project(':clients').sourceSets.test.output
}
javadoc {

View File

@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.test;
public interface IntegrationTest {
}

View File

@ -33,7 +33,7 @@ import scala.collection.mutable.Buffer
/**
* A helper class for writing integration tests that involve producers, consumers, and servers
*/
trait IntegrationTestHarness extends KafkaServerTestHarness {
abstract class IntegrationTestHarness extends KafkaServerTestHarness {
val producerCount: Int
val consumerCount: Int

View File

@ -36,7 +36,7 @@ import org.apache.kafka.common.network.ListenerName
/**
* A test harness that brings up some number of broker nodes
*/
trait KafkaServerTestHarness extends ZooKeeperTestHarness {
abstract class KafkaServerTestHarness extends ZooKeeperTestHarness {
var instanceConfigs: Seq[KafkaConfig] = null
var servers: Buffer[KafkaServer] = null
var brokerList: String = null

View File

@ -18,12 +18,16 @@
package kafka.zk
import javax.security.auth.login.Configuration
import kafka.utils.{ZkUtils, Logging, CoreUtils}
import kafka.utils.{CoreUtils, Logging, ZkUtils}
import org.junit.{After, Before}
import org.scalatest.junit.JUnitSuite
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.test.IntegrationTest
import org.junit.experimental.categories.Category
trait ZooKeeperTestHarness extends JUnitSuite with Logging {
@Category(Array(classOf[IntegrationTest]))
abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
val zkConnectionTimeout = 10000
val zkSessionTimeout = 6000

20
jenkins.sh Executable file
View File

@ -0,0 +1,20 @@
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This script is used for verifying changes in Jenkins. In order to provide faster feedback, the tasks are ordered so
# that faster tasks are executed in every module before slower tasks (if possible). For example, the unit tests for all
# the modules are executed before the integration tests.
./gradlew clean compileJava compileScala compileTestJava compileTestScala checkstyleMain checkstyleTest unitTest integrationTest --no-daemon -Dorg.gradle.project.testLoggingEvents=started,passed,skipped,failed "$@"

View File

@ -28,12 +28,14 @@ import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockMetricsReporter;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.util.Collections;
import java.util.HashMap;
@ -47,6 +49,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@Category({IntegrationTest.class})
public class KafkaStreamsTest {
private static final int NUM_BROKERS = 1;

View File

@ -31,13 +31,11 @@ import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.test.IntegrationTest;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.junit.experimental.categories.Category;
import java.util.ArrayList;
import java.util.Arrays;
@ -67,7 +65,7 @@ import static org.junit.Assert.assertThat;
* }
* </pre>
*/
@RunWith(Parameterized.class)
@Category({IntegrationTest.class})
public class FanoutIntegrationTest {
private static final int NUM_BROKERS = 1;
private static final long COMMIT_INTERVAL_MS = 300L;
@ -86,15 +84,6 @@ public class FanoutIntegrationTest {
CLUSTER.createTopic(OUTPUT_TOPIC_C);
}
@Parameter
public long cacheSizeBytes;
//Single parameter, use Object[]
@Parameters
public static Object[] data() {
return new Object[] {0, 10 * 1024 * 1024L};
}
@Test
public void shouldFanoutTheInput() throws Exception {
final List<String> inputValues = Arrays.asList("Hello", "World");
@ -116,7 +105,6 @@ public class FanoutIntegrationTest {
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
final KStream<byte[], String> stream1 = builder.stream(INPUT_TOPIC_A);
final KStream<byte[], String> stream2 = stream1.mapValues(

View File

@ -35,12 +35,14 @@ import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.util.Arrays;
@ -48,6 +50,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@Category({IntegrationTest.class})
public class GlobalKTableIntegrationTest {
private static final int NUM_BROKERS = 1;

View File

@ -38,12 +38,14 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockKeyValueMapper;
import org.apache.kafka.test.TestUtils;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.Map;
@ -60,6 +62,7 @@ import static org.junit.Assert.assertTrue;
/**
* Tests related to internal topics in streams
*/
@Category({IntegrationTest.class})
public class InternalTopicIntegrationTest {
private static final int NUM_BROKERS = 1;
@ClassRule

View File

@ -35,6 +35,7 @@ import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
@ -43,6 +44,7 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.util.Arrays;
import java.util.Collections;
@ -58,6 +60,7 @@ import static org.hamcrest.core.Is.is;
/**
* Tests all available joins of Kafka Streams DSL.
*/
@Category({IntegrationTest.class})
public class JoinIntegrationTest {
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);

View File

@ -35,6 +35,7 @@ import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockKeyValueMapper;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
@ -51,6 +52,7 @@ import java.util.Properties;
import java.util.concurrent.ExecutionException;
import kafka.utils.MockTime;
import org.junit.experimental.categories.Category;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
@ -59,6 +61,7 @@ import static org.hamcrest.core.Is.is;
* Similar to KStreamAggregationIntegrationTest but with dedupping enabled
* by virtue of having a large commit interval
*/
@Category({IntegrationTest.class})
public class KStreamAggregationDedupIntegrationTest {
private static final int NUM_BROKERS = 1;
private static final long COMMIT_INTERVAL_MS = 300L;

View File

@ -45,12 +45,14 @@ import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlySessionStore;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockKeyValueMapper;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.util.Arrays;
@ -69,6 +71,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertFalse;
@Category({IntegrationTest.class})
public class KStreamAggregationIntegrationTest {
private static final int NUM_BROKERS = 1;

View File

@ -37,15 +37,13 @@ import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.util.Arrays;
@ -59,8 +57,7 @@ import static org.junit.Assert.assertThat;
* End-to-end integration test that demonstrates how to perform a join between a KStream and a
* KTable (think: KStream.leftJoin(KTable)), i.e. an example of a stateful computation.
*/
@RunWith(Parameterized.class)
@Category({IntegrationTest.class})
public class KStreamKTableJoinIntegrationTest {
private static final int NUM_BROKERS = 1;
private static final long COMMIT_INTERVAL_MS = 300L;
@ -95,7 +92,7 @@ public class KStreamKTableJoinIntegrationTest {
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
}
@ -107,16 +104,6 @@ public class KStreamKTableJoinIntegrationTest {
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
}
@Parameter
public long cacheSizeBytes;
//Single parameter, use Object[]
@Parameters
public static Object[] data() {
return new Object[] {0, 10 * 1024 * 1024L};
}
/**
* Tuple for a region and its associated number of clicks.
*/
@ -147,7 +134,17 @@ public class KStreamKTableJoinIntegrationTest {
}
@Test
public void shouldCountClicksPerRegion() throws Exception {
public void shouldCountClicksPerRegionWithZeroByteCache() throws Exception {
countClicksPerRegion(0);
}
@Test
public void shouldCountClicksPerRegionWithNonZeroByteCache() throws Exception {
countClicksPerRegion(10 * 1024 * 1024);
}
private void countClicksPerRegion(final int cacheSizeBytes) throws java.util.concurrent.ExecutionException, InterruptedException {
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
// Input 1: Clicks per user (multiple records allowed per user).
final List<KeyValue<String, Long>> userClicks = Arrays.asList(
new KeyValue<>("alice", 13L),

View File

@ -36,6 +36,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockKeyValueMapper;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.TestUtils;
@ -44,10 +45,7 @@ import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.util.Arrays;
@ -60,7 +58,7 @@ import java.util.concurrent.TimeUnit;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
@RunWith(Parameterized.class)
@Category({IntegrationTest.class})
public class KStreamRepartitionJoinTest {
private static final int NUM_BROKERS = 1;
@ -87,15 +85,6 @@ public class KStreamRepartitionJoinTest {
private String streamFourInput;
private static volatile int testNo = 0;
@Parameter
public long cacheSizeBytes;
//Single parameter, use Object[]
@Parameters
public static Object[] data() {
return new Object[] {0, 10 * 1024 * 1024L};
}
@Before
public void before() throws InterruptedException {
testNo++;
@ -109,7 +98,6 @@ public class KStreamRepartitionJoinTest {
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
streamOne = builder.stream(Serdes.Long(), Serdes.Integer(), streamOneInput);
streamTwo = builder.stream(Serdes.Integer(), Serdes.String(), streamTwoInput);
@ -127,8 +115,17 @@ public class KStreamRepartitionJoinTest {
}
@Test
public void shouldCorrectlyRepartitionOnJoinOperations() throws Exception {
public void shouldCorrectlyRepartitionOnJoinOperationsWithZeroSizedCache() throws Exception {
verifyRepartitionOnJoinOperations(0);
}
@Test
public void shouldCorrectlyRepartitionOnJoinOperationsWithNonZeroSizedCache() throws Exception {
verifyRepartitionOnJoinOperations(10 * 1024 * 1024);
}
private void verifyRepartitionOnJoinOperations(final int cacheSizeBytes) throws Exception {
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
produceMessages();
final ExpectedOutputOnTopic mapOne = mapStreamOneAndJoin();
final ExpectedOutputOnTopic mapBoth = mapBothStreamsAndJoin();

View File

@ -30,6 +30,7 @@ import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
@ -37,6 +38,7 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.util.ArrayList;
import java.util.Arrays;
@ -49,6 +51,7 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
@Category({IntegrationTest.class})
public class KStreamsFineGrainedAutoResetIntegrationTest {
private static final int NUM_BROKERS = 1;

View File

@ -30,23 +30,24 @@ import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.experimental.categories.Category;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
@RunWith(Parameterized.class)
@Category({IntegrationTest.class})
public class KTableKTableJoinIntegrationTest {
private final static int NUM_BROKERS = 1;
@ -61,71 +62,6 @@ public class KTableKTableJoinIntegrationTest {
private KafkaStreams streams;
private final static Properties CONSUMER_CONFIG = new Properties();
@Parameterized.Parameter(value = 0)
public JoinType joinType1;
@Parameterized.Parameter(value = 1)
public JoinType joinType2;
@Parameterized.Parameter(value = 2)
public List<KeyValue<String, String>> expectedResult;
//Single parameter, use Object[]
@Parameterized.Parameters
public static Object[] parameters() {
return new Object[][]{
{JoinType.INNER, JoinType.INNER, Arrays.asList(
new KeyValue<>("b", "B1-B2-B3")//,
)},
{JoinType.INNER, JoinType.LEFT, Arrays.asList(
new KeyValue<>("b", "B1-B2-B3")//,
)},
{JoinType.INNER, JoinType.OUTER, Arrays.asList(
new KeyValue<>("a", "null-A3"),
new KeyValue<>("b", "null-B3"),
new KeyValue<>("c", "null-C3"),
new KeyValue<>("b", "B1-B2-B3")//,
)},
{JoinType.LEFT, JoinType.INNER, Arrays.asList(
new KeyValue<>("a", "A1-null-A3"),
new KeyValue<>("b", "B1-null-B3"),
new KeyValue<>("b", "B1-B2-B3")//,
)},
{JoinType.LEFT, JoinType.LEFT, Arrays.asList(
new KeyValue<>("a", "A1-null-A3"),
new KeyValue<>("b", "B1-null-B3"),
new KeyValue<>("b", "B1-B2-B3")//,
)},
{JoinType.LEFT, JoinType.OUTER, Arrays.asList(
new KeyValue<>("a", "null-A3"),
new KeyValue<>("b", "null-B3"),
new KeyValue<>("c", "null-C3"),
new KeyValue<>("a", "A1-null-A3"),
new KeyValue<>("b", "B1-null-B3"),
new KeyValue<>("b", "B1-B2-B3")//,
)},
{JoinType.OUTER, JoinType.INNER, Arrays.asList(
new KeyValue<>("a", "A1-null-A3"),
new KeyValue<>("b", "B1-null-B3"),
new KeyValue<>("b", "B1-B2-B3"),
new KeyValue<>("c", "null-C2-C3")
)},
{JoinType.OUTER, JoinType.LEFT, Arrays.asList(
new KeyValue<>("a", "A1-null-A3"),
new KeyValue<>("b", "B1-null-B3"),
new KeyValue<>("b", "B1-B2-B3"),
new KeyValue<>("c", "null-C2-C3")
)},
{JoinType.OUTER, JoinType.OUTER, Arrays.asList(
new KeyValue<>("a", "null-A3"),
new KeyValue<>("b", "null-B3"),
new KeyValue<>("c", "null-C3"),
new KeyValue<>("a", "A1-null-A3"),
new KeyValue<>("b", "B1-null-B3"),
new KeyValue<>("b", "B1-B2-B3"),
new KeyValue<>("c", "null-C2-C3")
)}
};
}
@BeforeClass
public static void beforeTest() throws Exception {
CLUSTER.createTopic(TABLE_1);
@ -194,7 +130,101 @@ public class KTableKTableJoinIntegrationTest {
INNER, LEFT, OUTER
}
private KafkaStreams prepareTopology() {
@Test
public void shouldInnerInnerJoin() throws Exception {
verifyKTableKTableJoin(JoinType.INNER, JoinType.INNER, Collections.singletonList(new KeyValue<>("b", "B1-B2-B3")));
}
@Test
public void shouldInnerLeftJoin() throws Exception {
verifyKTableKTableJoin(JoinType.INNER, JoinType.LEFT, Collections.singletonList(new KeyValue<>("b", "B1-B2-B3")));
}
@Test
public void shouldInnerOuterJoin() throws Exception {
verifyKTableKTableJoin(JoinType.INNER, JoinType.OUTER, Arrays.asList(
new KeyValue<>("a", "null-A3"),
new KeyValue<>("b", "null-B3"),
new KeyValue<>("c", "null-C3"),
new KeyValue<>("b", "B1-B2-B3")));
}
@Test
public void shouldLeftInnerJoin() throws Exception {
verifyKTableKTableJoin(JoinType.LEFT, JoinType.INNER, Arrays.asList(
new KeyValue<>("a", "A1-null-A3"),
new KeyValue<>("b", "B1-null-B3"),
new KeyValue<>("b", "B1-B2-B3")));
}
@Test
public void shouldLeftLeftJoin() throws Exception {
verifyKTableKTableJoin(JoinType.LEFT, JoinType.LEFT, Arrays.asList(
new KeyValue<>("a", "A1-null-A3"),
new KeyValue<>("b", "B1-null-B3"),
new KeyValue<>("b", "B1-B2-B3")));
}
@Test
public void shouldLeftOuterJoin() throws Exception {
verifyKTableKTableJoin(JoinType.LEFT, JoinType.OUTER, Arrays.asList(
new KeyValue<>("a", "null-A3"),
new KeyValue<>("b", "null-B3"),
new KeyValue<>("c", "null-C3"),
new KeyValue<>("a", "A1-null-A3"),
new KeyValue<>("b", "B1-null-B3"),
new KeyValue<>("b", "B1-B2-B3")));
}
@Test
public void shouldOuterInnerJoin() throws Exception {
verifyKTableKTableJoin(JoinType.OUTER, JoinType.INNER, Arrays.asList(
new KeyValue<>("a", "A1-null-A3"),
new KeyValue<>("b", "B1-null-B3"),
new KeyValue<>("b", "B1-B2-B3"),
new KeyValue<>("c", "null-C2-C3")));
}
@Test
public void shouldOuterLeftJoin() throws Exception {
verifyKTableKTableJoin(JoinType.OUTER, JoinType.LEFT, Arrays.asList(
new KeyValue<>("a", "A1-null-A3"),
new KeyValue<>("b", "B1-null-B3"),
new KeyValue<>("b", "B1-B2-B3"),
new KeyValue<>("c", "null-C2-C3")));
}
@Test
public void shouldOuterOuterJoin() throws Exception {
verifyKTableKTableJoin(JoinType.OUTER, JoinType.OUTER, Arrays.asList(
new KeyValue<>("a", "null-A3"),
new KeyValue<>("b", "null-B3"),
new KeyValue<>("c", "null-C3"),
new KeyValue<>("a", "A1-null-A3"),
new KeyValue<>("b", "B1-null-B3"),
new KeyValue<>("b", "B1-B2-B3"),
new KeyValue<>("c", "null-C2-C3")));
}
private void verifyKTableKTableJoin(final JoinType joinType1,
final JoinType joinType2,
final List<KeyValue<String, String>> expectedResult) throws Exception {
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, joinType1 + "-" + joinType2 + "-ktable-ktable-join");
streams = prepareTopology(joinType1, joinType2);
streams.start();
final List<KeyValue<String, String>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
CONSUMER_CONFIG,
OUTPUT,
expectedResult.size());
assertThat(result, equalTo(expectedResult));
}
private KafkaStreams prepareTopology(final JoinType joinType1, final JoinType joinType2) {
final KStreamBuilder builder = new KStreamBuilder();
final KTable<String, String> table1 = builder.table(TABLE_1, TABLE_1);
@ -226,20 +256,4 @@ public class KTableKTableJoinIntegrationTest {
throw new RuntimeException("Unknown join type.");
}
@Test
public void KTableKTableJoin() throws Exception {
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, joinType1 + "-" + joinType2 + "-ktable-ktable-join");
streams = prepareTopology();
streams.start();
final List<KeyValue<String, String>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
CONSUMER_CONFIG,
OUTPUT,
expectedResult.size());
assertThat(result, equalTo(expectedResult));
}
}

View File

@ -44,6 +44,7 @@ import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.StreamsMetadata;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockKeyValueMapper;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
@ -55,10 +56,7 @@ import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.util.Arrays;
@ -77,7 +75,7 @@ import java.util.concurrent.TimeUnit;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;
@RunWith(Parameterized.class)
@Category({IntegrationTest.class})
public class QueryableStateIntegrationTest {
private static final int NUM_BROKERS = 1;
private static final long COMMIT_INTERVAL_MS = 300L;
@ -85,7 +83,7 @@ public class QueryableStateIntegrationTest {
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER =
new EmbeddedKafkaCluster(NUM_BROKERS);
public static final int STREAM_THREE_PARTITIONS = 4;
private static final int STREAM_THREE_PARTITIONS = 4;
private final MockTime mockTime = CLUSTER.time;
private String streamOne = "stream-one";
private String streamTwo = "stream-two";
@ -106,7 +104,7 @@ public class QueryableStateIntegrationTest {
private Comparator<KeyValue<String, Long>> stringLongComparator;
private static int testNo = 0;
public void createTopics() throws InterruptedException {
private void createTopics() throws InterruptedException {
streamOne = streamOne + "-" + testNo;
streamConcurrent = streamConcurrent + "-" + testNo;
streamThree = streamThree + "-" + testNo;
@ -123,15 +121,6 @@ public class QueryableStateIntegrationTest {
CLUSTER.createTopic(outputTopicThree);
}
@Parameter
public long cacheSizeBytes;
//Single parameter, use Object[]
@Parameters
public static Object[] data() {
return new Object[]{0, 10 * 1024 * 1024L};
}
@Before
public void before() throws IOException, InterruptedException {
testNo++;
@ -147,8 +136,7 @@ public class QueryableStateIntegrationTest {
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("qs-test").getPath());
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
stringComparator = new Comparator<KeyValue<String, String>>() {
@ -426,7 +414,17 @@ public class QueryableStateIntegrationTest {
}
@Test
public void shouldBeAbleToQueryState() throws Exception {
public void shouldBeAbleToQueryStateWithZeroSizedCache() throws Exception {
verifyCanQueryState(0);
}
@Test
public void shouldBeAbleToQueryStateWithNonZeroSizedCache() throws Exception {
verifyCanQueryState(10 * 1024 * 1024);
}
private void verifyCanQueryState(int cacheSizeBytes) throws java.util.concurrent.ExecutionException, InterruptedException {
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
final KStreamBuilder builder = new KStreamBuilder();
final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
@ -446,13 +444,13 @@ public class QueryableStateIntegrationTest {
IntegrationTestUtils.produceKeyValuesSynchronously(
streamOne,
batch1,
TestUtils.producerConfig(
batch1,
TestUtils.producerConfig(
CLUSTER.bootstrapServers(),
StringSerializer.class,
StringSerializer.class,
new Properties()),
mockTime);
mockTime);
final KStream<String, String> s1 = builder.stream(streamOne);
@ -477,7 +475,6 @@ public class QueryableStateIntegrationTest {
myCount);
verifyRangeAndAll(expectedCount, myCount);
}
@Test

View File

@ -38,6 +38,7 @@ import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
@ -46,6 +47,7 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.lang.reflect.Field;
import java.util.ArrayList;
@ -65,6 +67,7 @@ import static org.junit.Assert.fail;
* End-to-end integration test based on using regex and named topics for creating sources, using
* an embedded Kafka cluster.
*/
@Category({IntegrationTest.class})
public class RegexSourceIntegrationTest {
private static final int NUM_BROKERS = 1;
@ClassRule

View File

@ -39,6 +39,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.AfterClass;
@ -47,6 +48,7 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.util.Collections;
import java.util.HashSet;
@ -60,6 +62,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
/**
* Tests local state store and global application cleanup.
*/
@Category({IntegrationTest.class})
public class ResetIntegrationTest {
private static final int NUM_BROKERS = 1;