KAFKA-14572: Migrate EmbeddedKafkaCluster used by Streams integration tests from EmbeddedZookeeper to KRaft (#17016)

Migrate the EmbeddedKafkaCluster from the EmbeddedZookeeper to KRaft

Reviewers Bill Bejeck <bbejeck@apache.org>
This commit is contained in:
Omnia Ibrahim 2024-09-27 20:49:12 +01:00 committed by GitHub
parent 10c789416c
commit 1854d4b8a1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
33 changed files with 406 additions and 602 deletions

View File

@ -2528,6 +2528,7 @@ project(':streams') {
// testCompileOnly prevents streams from exporting a dependency on test-utils, which would cause a dependency cycle
testCompileOnly project(':streams:test-utils')
testImplementation project(':metadata')
testImplementation project(':clients').sourceSets.test.output
testImplementation project(':server')
testImplementation project(':core')

View File

@ -393,6 +393,8 @@
</subpackage>
<subpackage name="integration">
<allow pkg="kafka.testkit"/>
<allow pkg="org.apache.kafka.metadata"/>
<allow pkg="kafka.admin" />
<allow pkg="kafka.api" />
<allow pkg="kafka.cluster" />

View File

@ -150,7 +150,7 @@ public abstract class AbstractResetIntegrationTest {
protected static final int STREAMS_CONSUMER_TIMEOUT = 2000;
protected static final int CLEANUP_CONSUMER_TIMEOUT = 2000;
protected static final int TIMEOUT_MULTIPLIER = 15;
protected static final int TIMEOUT_MULTIPLIER = 30;
void prepareTest(final TestInfo testInfo) throws Exception {
final String appID = IntegrationTestUtils.safeUniqueTestName(testInfo);
@ -159,7 +159,7 @@ public abstract class AbstractResetIntegrationTest {
waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT);
cluster.deleteAllTopicsAndWait(120000);
cluster.deleteAllTopics();
cluster.createTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN);
add10InputElements();
@ -199,7 +199,7 @@ public abstract class AbstractResetIntegrationTest {
// RUN
streams = new KafkaStreams(setupTopologyWithIntermediateTopic(true, OUTPUT_TOPIC_2), streamsConfig);
streams.start();
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
streams.close();
@ -272,7 +272,7 @@ public abstract class AbstractResetIntegrationTest {
// RUN
streams = new KafkaStreams(setupTopologyWithIntermediateTopic(useRepartitioned, OUTPUT_TOPIC_2), streamsConfig);
streams.start();
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
// receive only first values to make sure intermediate user topic is not consumed completely
// => required to test "seekToEnd" for intermediate topics
@ -301,7 +301,7 @@ public abstract class AbstractResetIntegrationTest {
assertInternalTopicsGotDeleted(useRepartitioned ? null : INTERMEDIATE_USER_TOPIC);
// RE-RUN
streams.start();
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
final List<KeyValue<Long, Long>> resultRerun2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC_2_RERUN, 40);
streams.close();
@ -323,7 +323,7 @@ public abstract class AbstractResetIntegrationTest {
cleanGlobal(!useRepartitioned, null, null, appID);
if (!useRepartitioned) {
cluster.deleteTopicAndWait(INTERMEDIATE_USER_TOPIC);
cluster.deleteTopic(INTERMEDIATE_USER_TOPIC);
}
}
@ -420,7 +420,6 @@ public abstract class AbstractResetIntegrationTest {
}
protected void assertInternalTopicsGotDeleted(final String additionalExistingTopic) throws Exception {
// do not use list topics request, but read from the embedded cluster's zookeeper path directly to confirm
if (additionalExistingTopic != null) {
cluster.waitForRemainingTopics(30000, INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN,
Topic.GROUP_METADATA_TOPIC_NAME, additionalExistingTopic);

View File

@ -107,7 +107,7 @@ public class AdjustStreamThreadCountTest {
builder = new StreamsBuilder();
builder.stream(inputTopic);
properties = mkObjectProperties(
properties = mkObjectProperties(
mkMap(
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),

View File

@ -162,8 +162,7 @@ public class EosIntegrationTest {
@BeforeEach
public void createTopics() throws Exception {
applicationId = "appId-" + TEST_NUMBER.getAndIncrement();
CLUSTER.deleteTopicsAndWait(
60_000L,
CLUSTER.deleteTopics(
SINGLE_PARTITION_INPUT_TOPIC, MULTI_PARTITION_INPUT_TOPIC,
SINGLE_PARTITION_THROUGH_TOPIC, MULTI_PARTITION_THROUGH_TOPIC,
SINGLE_PARTITION_OUTPUT_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC);

View File

@ -67,7 +67,6 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import static java.util.Arrays.asList;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
@ -81,19 +80,11 @@ import static org.hamcrest.Matchers.is;
@Tag("integration")
public class HighAvailabilityTaskAssignorIntegrationTest {
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3,
new Properties(),
asList(
new Properties() {{
setProperty(ServerConfigs.BROKER_RACK_CONFIG, AssignmentTestUtils.RACK_0);
}},
new Properties() {{
setProperty(ServerConfigs.BROKER_RACK_CONFIG, AssignmentTestUtils.RACK_1);
}},
new Properties() {{
setProperty(ServerConfigs.BROKER_RACK_CONFIG, AssignmentTestUtils.RACK_2);
}}
)
);
new Properties(), mkMap(
mkEntry(0, mkMap(mkEntry(ServerConfigs.BROKER_RACK_CONFIG, AssignmentTestUtils.RACK_0))),
mkEntry(1, mkMap(mkEntry(ServerConfigs.BROKER_RACK_CONFIG, AssignmentTestUtils.RACK_1))),
mkEntry(2, mkMap(mkEntry(ServerConfigs.BROKER_RACK_CONFIG, AssignmentTestUtils.RACK_2)))
));
@BeforeAll
public static void startCluster() throws IOException {
@ -258,7 +249,7 @@ public class HighAvailabilityTaskAssignorIntegrationTest {
restoreCompleteLatch.await();
// We should finalize the restoration without having restored any records (because they're already in
// the store. Otherwise, we failed to properly re-use the state from the standby.
// the store). Otherwise, we failed to properly re-use the state from the standby.
assertThat(instance1TotalRestored.get(), is(0L));
// Belt-and-suspenders check that we never even attempt to restore any records.
assertThat(instance1NumRestored.get(), is(-1L));

View File

@ -379,7 +379,7 @@ public class IQv2StoreIntegrationTest {
throws InterruptedException, IOException, ExecutionException, TimeoutException {
CLUSTER.start();
CLUSTER.deleteAllTopicsAndWait(60 * 1000L);
CLUSTER.deleteAllTopics();
final int partitions = 2;
CLUSTER.createTopic(INPUT_TOPIC_NAME, partitions, 1);

View File

@ -49,6 +49,7 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
@ -70,12 +71,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
@Timeout(600)
public class JoinGracePeriodDurabilityIntegrationTest {
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(
3,
mkProperties(mkMap()),
0L
);
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);
private static final long NOW = Instant.now().toEpochMilli();
@BeforeAll
public static void startCluster() throws IOException {
CLUSTER.start();
@ -218,7 +215,7 @@ public class JoinGracePeriodDurabilityIntegrationTest {
* just to exercise that everything works properly in the presence of commits.
*/
private long scaledTime(final long unscaledTime) {
return COMMIT_INTERVAL * 2 * unscaledTime;
return NOW + COMMIT_INTERVAL * 2 * unscaledTime;
}
private static void produceSynchronouslyToPartitionZero(final String topic, final List<KeyValueTimestamp<String, String>> toProduce) {

View File

@ -97,7 +97,7 @@ public class JoinStoreIntegrationTest {
@AfterEach
public void cleanup() throws InterruptedException, IOException {
CLUSTER.deleteAllTopicsAndWait(120000);
CLUSTER.deleteAllTopics();
IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
}

View File

@ -84,7 +84,7 @@ public class JoinWithIncompleteMetadataIntegrationTest {
@AfterEach
public void cleanup() throws InterruptedException, IOException {
CLUSTER.deleteAllTopicsAndWait(120000);
CLUSTER.deleteAllTopics();
IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
}

View File

@ -120,8 +120,7 @@ public class KTableSourceTopicRestartIntegrationTest {
@Test
public void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosDisabled() throws Exception {
try {
streams = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
streams.start();
streams = IntegrationTestUtils.getRunningStreams(STREAMS_CONFIG, streamsBuilder, false);
produceKeyValues("a", "b", "c");
@ -131,7 +130,7 @@ public class KTableSourceTopicRestartIntegrationTest {
streams = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
// the state restore listener will append one record to the log
streams.setGlobalStateRestoreListener(new UpdatingSourceTopicOnRestoreStartStateRestoreListener());
streams.start();
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
produceKeyValues("f", "g", "h");
@ -149,8 +148,7 @@ public class KTableSourceTopicRestartIntegrationTest {
STREAMS_CONFIG.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
try {
streams = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
streams.start();
streams = IntegrationTestUtils.getRunningStreams(STREAMS_CONFIG, streamsBuilder, false);
produceKeyValues("a", "b", "c");
@ -160,7 +158,7 @@ public class KTableSourceTopicRestartIntegrationTest {
streams = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
// the state restore listener will append one record to the log
streams.setGlobalStateRestoreListener(new UpdatingSourceTopicOnRestoreStartStateRestoreListener());
streams.start();
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
produceKeyValues("f", "g", "h");
@ -176,16 +174,14 @@ public class KTableSourceTopicRestartIntegrationTest {
@Test
public void shouldRestoreAndProgressWhenTopicNotWrittenToDuringRestoration() throws Exception {
try {
streams = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
streams.start();
streams = IntegrationTestUtils.getStartedStreams(STREAMS_CONFIG, streamsBuilder, false);
produceKeyValues("a", "b", "c");
assertNumberValuesRead(readKeyValues, expectedInitialResultsMap, "Table did not read all values");
streams.close();
streams = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
streams.start();
streams = IntegrationTestUtils.getRunningStreams(STREAMS_CONFIG, streamsBuilder, false);
produceKeyValues("f", "g", "h");

View File

@ -135,7 +135,7 @@ public class KafkaStreamsCloseOptionsIntegrationTest {
adminClient = Admin.create(commonClientConfig);
}
CLUSTER.deleteAllTopicsAndWait(120_000L);
CLUSTER.deleteAllTopics();
CLUSTER.createTopic(INPUT_TOPIC, 2, 1);
CLUSTER.createTopic(OUTPUT_TOPIC, 2, 1);

View File

@ -97,7 +97,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
@Timeout(600)
@Tag("integration")
public class NamedTopologyIntegrationTest {
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);
private static final String TOPOLOGY_1 = "topology-1";
private static final String TOPOLOGY_2 = "topology-2";
@ -243,14 +243,14 @@ public class NamedTopologyIntegrationTest {
CLUSTER.getAllTopicsInCluster().stream().filter(t -> t.contains("-changelog") || t.contains("-repartition")).forEach(t -> {
try {
assertThat("topic was not decorated", t.contains(TOPIC_PREFIX));
CLUSTER.deleteTopicsAndWait(t);
} catch (final InterruptedException e) {
CLUSTER.deleteTopics(t);
} catch (final RuntimeException e) {
e.printStackTrace();
}
});
CLUSTER.deleteTopicsAndWait(OUTPUT_STREAM_1, OUTPUT_STREAM_2, OUTPUT_STREAM_3);
CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT);
CLUSTER.deleteTopics(OUTPUT_STREAM_1, OUTPUT_STREAM_2, OUTPUT_STREAM_3);
CLUSTER.deleteTopics(SUM_OUTPUT, COUNT_OUTPUT);
}
@Test
@ -518,8 +518,8 @@ public class NamedTopologyIntegrationTest {
CLUSTER.getAllTopicsInCluster().stream().filter(t -> t.contains("-changelog")).forEach(t -> {
try {
CLUSTER.deleteTopicAndWait(t);
} catch (final InterruptedException e) {
CLUSTER.deleteTopic(t);
} catch (final RuntimeException e) {
e.printStackTrace();
}
});
@ -570,7 +570,7 @@ public class NamedTopologyIntegrationTest {
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 5), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 5), equalTo(SUM_OUTPUT_DATA));
} finally {
CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT);
CLUSTER.deleteTopics(SUM_OUTPUT, COUNT_OUTPUT);
CLUSTER.deleteTopics(DELAYED_INPUT_STREAM_1);
}
}
@ -624,8 +624,8 @@ public class NamedTopologyIntegrationTest {
CLUSTER.getAllTopicsInCluster().stream().filter(t -> t.contains("changelog")).forEach(t -> {
try {
CLUSTER.deleteTopicAndWait(t);
} catch (final InterruptedException e) {
CLUSTER.deleteTopic(t);
} catch (final RuntimeException e) {
e.printStackTrace();
}
});
@ -640,7 +640,7 @@ public class NamedTopologyIntegrationTest {
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 5), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 5), equalTo(SUM_OUTPUT_DATA));
} finally {
CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT);
CLUSTER.deleteTopics(SUM_OUTPUT, COUNT_OUTPUT);
}
}
@ -662,8 +662,8 @@ public class NamedTopologyIntegrationTest {
CLUSTER.getAllTopicsInCluster().stream().filter(t -> t.contains("-changelog") || t.contains("-repartition")).forEach(t -> {
try {
CLUSTER.deleteTopicsAndWait(t);
} catch (final InterruptedException e) {
CLUSTER.deleteTopics(t);
} catch (final RuntimeException e) {
e.printStackTrace();
}
});
@ -678,7 +678,7 @@ public class NamedTopologyIntegrationTest {
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 5), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 5), equalTo(SUM_OUTPUT_DATA));
CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT);
CLUSTER.deleteTopics(SUM_OUTPUT, COUNT_OUTPUT);
}
/**

View File

@ -102,7 +102,7 @@ import static org.hamcrest.Matchers.is;
public class PositionRestartIntegrationTest {
private static final Logger LOG = LoggerFactory.getLogger(PositionRestartIntegrationTest.class);
private static final long SEED = new Random().nextLong();
private static final int NUM_BROKERS = 1;
private static final int NUM_BROKERS = 3;
public static final Duration WINDOW_SIZE = Duration.ofMinutes(5);
private static int port = 0;
private static final String INPUT_TOPIC_NAME = "input-topic";
@ -274,7 +274,7 @@ public class PositionRestartIntegrationTest {
throws InterruptedException, IOException, ExecutionException, TimeoutException {
CLUSTER.start();
CLUSTER.deleteAllTopicsAndWait(60 * 1000L);
CLUSTER.deleteAllTopics();
final int partitions = 2;
CLUSTER.createTopic(INPUT_TOPIC_NAME, partitions, 1);

View File

@ -242,7 +242,7 @@ public class QueryableStateIntegrationTest {
kafkaStreams.close(ofSeconds(30));
}
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
CLUSTER.deleteAllTopicsAndWait(0L);
CLUSTER.deleteAllTopics();
}
/**

View File

@ -153,7 +153,7 @@ public class RangeQueryIntegrationTest {
@AfterEach
public void cleanup() throws InterruptedException {
CLUSTER.deleteAllTopicsAndWait(120000);
CLUSTER.deleteAllTopics();
}
@ParameterizedTest

View File

@ -196,7 +196,7 @@ public class RegexSourceIntegrationTest {
streams.close();
} finally {
CLUSTER.deleteTopicsAndWait("TEST-TOPIC-1", "TEST-TOPIC-2");
CLUSTER.deleteTopics("TEST-TOPIC-1", "TEST-TOPIC-2");
}
}
@ -248,7 +248,7 @@ public class RegexSourceIntegrationTest {
streams.close();
} finally {
CLUSTER.deleteTopicsAndWait(topic1, topic2);
CLUSTER.deleteTopics(topic1, topic2);
}
}
@ -290,7 +290,7 @@ public class RegexSourceIntegrationTest {
streams.start();
TestUtils.waitForCondition(() -> assignedTopics.equals(expectedFirstAssignment), STREAM_TASKS_NOT_UPDATED);
} finally {
CLUSTER.deleteTopicAndWait("TEST-TOPIC-A");
CLUSTER.deleteTopic("TEST-TOPIC-A");
}
TestUtils.waitForCondition(() -> assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED);

View File

@ -68,7 +68,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest {
// expiration of connections by the brokers to avoid errors when `AdminClient` sends requests after potentially
// very long sleep times
brokerProps.put(SocketServerConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG, -1L);
CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
CLUSTER = new EmbeddedKafkaCluster(3, brokerProps);
}
@BeforeAll
@ -98,7 +98,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest {
}
@Test
public void shouldNotAllowToResetWhileStreamsIsRunning(final TestInfo testInfo) {
public void shouldNotAllowToResetWhileStreamsIsRunning(final TestInfo testInfo) throws Exception {
final String appID = IntegrationTestUtils.safeUniqueTestName(testInfo);
final String[] parameters = new String[] {
"--application-id", appID,
@ -113,7 +113,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest {
// RUN
streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
streams.start();
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
final int exitCode = new StreamsResetter().execute(parameters, cleanUpConfig);
assertEquals(1, exitCode);
@ -193,7 +193,8 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest {
// Run
streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
streams.start();
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
streams.close();
@ -213,7 +214,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest {
assertInternalTopicsGotDeleted(null);
// RE-RUN
streams.start();
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
streams.close();
@ -228,7 +229,8 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest {
// RUN
streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
streams.start();
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
streams.close();
@ -251,7 +253,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest {
resetFile.deleteOnExit();
// RE-RUN
streams.start();
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 5);
streams.close();
@ -269,7 +271,8 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest {
// RUN
streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
streams.start();
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
streams.close();
@ -297,7 +300,8 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest {
resetFile.deleteOnExit();
// RE-RUN
streams.start();
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
streams.close();
@ -314,7 +318,8 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest {
// RUN
streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
streams.start();
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
streams.close();
@ -337,7 +342,8 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest {
resetFile.deleteOnExit();
// RE-RUN
streams.start();
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
streams.close();

View File

@ -33,8 +33,6 @@ import java.io.IOException;
import java.util.Map;
import java.util.Properties;
import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG;
/**
* Tests command line SSL setup for reset tool.
*/
@ -54,9 +52,7 @@ public class ResetIntegrationWithSslTest extends AbstractResetIntegrationTest {
try {
SSL_CONFIG = TestSslUtils.createSslConfig(false, true, ConnectionMode.SERVER, TestUtils.tempFile(), "testCert");
brokerProps.put(SocketServerConfigs.LISTENERS_CONFIG, "SSL://localhost:0");
brokerProps.put(INTER_BROKER_LISTENER_NAME_CONFIG, "SSL");
brokerProps.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "EXTERNAL:SSL,CONTROLLER:SSL,INTERNAL:SSL");
brokerProps.putAll(SSL_CONFIG);
} catch (final Exception e) {
throw new RuntimeException(e);

View File

@ -44,6 +44,7 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
@ -64,13 +65,15 @@ import static org.hamcrest.MatcherAssert.assertThat;
public class ResetPartitionTimeIntegrationTest {
private static final int NUM_BROKERS = 1;
private static final Properties BROKER_CONFIG;
private static final long NOW = Instant.now().toEpochMilli();
static {
BROKER_CONFIG = new Properties();
BROKER_CONFIG.put("transaction.state.log.replication.factor", (short) 1);
BROKER_CONFIG.put("transaction.state.log.min.isr", 1);
}
public static final EmbeddedKafkaCluster CLUSTER =
new EmbeddedKafkaCluster(NUM_BROKERS, BROKER_CONFIG, 0L);
new EmbeddedKafkaCluster(NUM_BROKERS, BROKER_CONFIG);
@BeforeAll
public static void startCluster() throws IOException {
@ -117,13 +120,13 @@ public class ResetPartitionTimeIntegrationTest {
produceSynchronouslyToPartitionZero(
input,
Collections.singletonList(
new KeyValueTimestamp<>("k3", "v3", 5000)
new KeyValueTimestamp<>("k3", "v3", NOW + 5000)
)
);
verifyOutput(
outputRaw,
Collections.singletonList(
new KeyValueTimestamp<>("k3", "v3", 5000)
new KeyValueTimestamp<>("k3", "v3", NOW + 5000)
)
);
assertThat(lastRecordedTimestamp, is(-1L));
@ -138,16 +141,16 @@ public class ResetPartitionTimeIntegrationTest {
produceSynchronouslyToPartitionZero(
input,
Collections.singletonList(
new KeyValueTimestamp<>("k5", "v5", 4999)
new KeyValueTimestamp<>("k5", "v5", NOW + 4999)
)
);
verifyOutput(
outputRaw,
Collections.singletonList(
new KeyValueTimestamp<>("k5", "v5", 4999)
new KeyValueTimestamp<>("k5", "v5", NOW + 4999)
)
);
assertThat(lastRecordedTimestamp, is(5000L));
assertThat(lastRecordedTimestamp, is(NOW + 5000L));
} finally {
kafkaStreams.close();
quietlyCleanStateAfterTest(CLUSTER, kafkaStreams);

View File

@ -117,7 +117,7 @@ public class RestoreIntegrationTest {
private static final Duration RESTORATION_DELAY = Duration.ofMillis(2000);
private static final int NUM_BROKERS = 1;
private static final int NUM_BROKERS = 2;
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);

View File

@ -135,7 +135,7 @@ public class RocksDBMetricsIntegrationTest {
@AfterEach
public void after() throws Exception {
CLUSTER.deleteTopicsAndWait(STREAM_INPUT_ONE, STREAM_INPUT_TWO, STREAM_OUTPUT_ONE, STREAM_OUTPUT_TWO);
CLUSTER.deleteTopics(STREAM_INPUT_ONE, STREAM_INPUT_TWO, STREAM_OUTPUT_ONE, STREAM_OUTPUT_TWO);
}
@FunctionalInterface

View File

@ -104,7 +104,7 @@ public class StandbyTaskEOSIntegrationTest {
inputTopic = "input-" + safeTestName;
outputTopic = "output-" + safeTestName;
storeName = "store-" + safeTestName;
CLUSTER.deleteTopicsAndWait(inputTopic, outputTopic, appId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog");
CLUSTER.deleteTopics(inputTopic, outputTopic, appId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog");
CLUSTER.createTopic(inputTopic, 1, 3);
CLUSTER.createTopic(outputTopic, 1, 3);
}

View File

@ -18,9 +18,11 @@ package org.apache.kafka.streams.integration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StoreQueryParameters;
@ -99,7 +101,7 @@ public class StandbyTaskEOSMultiRebalanceIntegrationTest {
storeName = "store-" + safeTestName;
counterName = "counter-" + safeTestName;
CLUSTER.deleteTopicsAndWait(inputTopic, outputTopic);
CLUSTER.deleteTopics(inputTopic, outputTopic);
CLUSTER.createTopic(inputTopic, partitionCount, 3);
CLUSTER.createTopic(outputTopic, partitionCount, 3);
}
@ -141,7 +143,7 @@ public class StandbyTaskEOSMultiRebalanceIntegrationTest {
CLUSTER.bootstrapServers(),
IntegerSerializer.class,
IntegerSerializer.class,
new Properties()
Utils.mkProperties(Collections.singletonMap(ProducerConfig.ACKS_CONFIG, "all"))
),
10L + time
);

View File

@ -171,7 +171,7 @@ public class StateDirectoryIntegrationTest {
).findFirst().isPresent()
);
} finally {
CLUSTER.deleteAllTopicsAndWait(0L);
CLUSTER.deleteAllTopics();
}
}
@ -271,7 +271,7 @@ public class StateDirectoryIntegrationTest {
assertTrue((new File(stateDir)).exists()); // Root state store exists
assertTrue(appDir.exists()); // Application state store exists
} finally {
CLUSTER.deleteAllTopicsAndWait(0L);
CLUSTER.deleteAllTopics();
}
}
}

View File

@ -54,6 +54,7 @@ import org.junit.jupiter.api.Timeout;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -80,8 +81,9 @@ import static org.junit.jupiter.api.Assertions.fail;
@Tag("integration")
@Timeout(600)
public class StreamsUncaughtExceptionHandlerIntegrationTest {
private static final long NOW = Instant.now().toEpochMilli();
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1, new Properties(), Collections.emptyList(), 0L, 0L);
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
@BeforeAll
public static void startCluster() throws IOException {
@ -146,7 +148,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
kafkaStreams.setUncaughtExceptionHandler((t, e) -> counter.incrementAndGet());
startApplicationAndWaitUntilRunning(kafkaStreams);
produceMessages(0L, inputTopic, "A");
produceMessages(NOW, inputTopic, "A");
// should call the UncaughtExceptionHandler in current thread
TestUtils.waitForCondition(() -> counter.get() == 1, "Handler was called 1st time");
@ -168,7 +170,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
startApplicationAndWaitUntilRunning(kafkaStreams);
produceMessages(0L, inputTopic, "A");
produceMessages(NOW, inputTopic, "A");
waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.ERROR, DEFAULT_DURATION);
assertThat(processorValueCollector.size(), equalTo(1));
@ -252,7 +254,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
startApplicationAndWaitUntilRunning(kafkaStreams);
produceMessages(0L, inputTopic2, "A");
produceMessages(NOW, inputTopic2, "A");
waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.ERROR, DEFAULT_DURATION);
assertThat(processorValueCollector.size(), equalTo(1));
@ -297,7 +299,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
IntegerSerializer.class,
StringSerializer.class,
new Properties()),
0L);
NOW);
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
inputTopic2,
@ -310,7 +312,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
IntegerSerializer.class,
StringSerializer.class,
new Properties()),
0L);
NOW);
IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
TestUtils.consumerConfig(
@ -365,7 +367,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
startApplicationAndWaitUntilRunning(asList(kafkaStreams1, kafkaStreams2));
produceMessages(0L, inputTopic, "A");
produceMessages(NOW, inputTopic, "A");
waitForApplicationState(asList(kafkaStreams1, kafkaStreams2), KafkaStreams.State.ERROR, DEFAULT_DURATION);
assertThat(processorValueCollector.size(), equalTo(1));
@ -386,7 +388,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
});
startApplicationAndWaitUntilRunning(kafkaStreams);
produceMessages(0L, inputTopic, "A");
produceMessages(NOW, inputTopic, "A");
TestUtils.waitForCondition(() -> count.get() == numThreads, "finished replacing threads");
TestUtils.waitForCondition(() -> throwError.get(), "finished replacing threads");
kafkaStreams.close();

View File

@ -55,11 +55,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashSet;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@ -82,12 +81,8 @@ import static org.hamcrest.Matchers.equalTo;
@Tag("integration")
@Timeout(600)
public class SuppressionDurabilityIntegrationTest {
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(
3,
mkProperties(mkMap()),
0L
);
private static final long NOW = Instant.now().toEpochMilli();
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);
@BeforeAll
public static void startCluster() throws IOException {
@ -173,11 +168,11 @@ public class SuppressionDurabilityIntegrationTest {
);
verifyOutput(
outputRaw,
new HashSet<>(asList(
asList(
new KeyValueTimestamp<>("k1", 1L, scaledTime(1L)),
new KeyValueTimestamp<>("k2", 1L, scaledTime(2L)),
new KeyValueTimestamp<>("k3", 1L, scaledTime(3L))
))
)
);
assertThat(eventCount.get(), is(0));
@ -191,10 +186,10 @@ public class SuppressionDurabilityIntegrationTest {
);
verifyOutput(
outputRaw,
new HashSet<>(asList(
asList(
new KeyValueTimestamp<>("k4", 1L, scaledTime(4L)),
new KeyValueTimestamp<>("k5", 1L, scaledTime(5L))
))
)
);
assertThat(eventCount.get(), is(2));
verifyOutput(
@ -225,11 +220,11 @@ public class SuppressionDurabilityIntegrationTest {
);
verifyOutput(
outputRaw,
new HashSet<>(asList(
asList(
new KeyValueTimestamp<>("k6", 1L, scaledTime(6L)),
new KeyValueTimestamp<>("k7", 1L, scaledTime(7L)),
new KeyValueTimestamp<>("k8", 1L, scaledTime(8L))
))
)
);
assertThat("suppress has apparently produced some duplicates. There should only be 5 output events.",
eventCount.get(), is(5));
@ -303,24 +298,12 @@ public class SuppressionDurabilityIntegrationTest {
IntegrationTestUtils.verifyKeyValueTimestamps(properties, topic, keyValueTimestamps);
}
private void verifyOutput(final String topic, final Set<KeyValueTimestamp<String, Long>> keyValueTimestamps) {
final Properties properties = mkProperties(
mkMap(
mkEntry(ConsumerConfig.GROUP_ID_CONFIG, "test-group"),
mkEntry(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
mkEntry(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ((Deserializer<String>) STRING_DESERIALIZER).getClass().getName()),
mkEntry(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ((Deserializer<Long>) LONG_DESERIALIZER).getClass().getName())
)
);
IntegrationTestUtils.verifyKeyValueTimestamps(properties, topic, keyValueTimestamps);
}
/**
* scaling to ensure that there are commits in between the various test events,
* just to exercise that everything works properly in the presence of commits.
*/
private long scaledTime(final long unscaledTime) {
return COMMIT_INTERVAL * 2 * unscaledTime;
return NOW + COMMIT_INTERVAL * 2 * unscaledTime;
}
private static void produceSynchronouslyToPartitionZero(final String topic, final List<KeyValueTimestamp<String, String>> toProduce) {

View File

@ -52,6 +52,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.io.IOException;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
@ -83,12 +84,8 @@ import static org.hamcrest.Matchers.empty;
@Tag("integration")
@Timeout(600)
public class SuppressionIntegrationTest {
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(
1,
mkProperties(mkMap()),
0L
);
private static final long NOW = Instant.now().toEpochMilli();
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
@BeforeAll
public static void startCluster() throws IOException {
@ -525,7 +522,7 @@ public class SuppressionIntegrationTest {
* just to exercise that everything works properly in the presence of commits.
*/
private static long scaledTime(final long unscaledTime) {
return COMMIT_INTERVAL * 2 * unscaledTime;
return NOW + COMMIT_INTERVAL * 2 * unscaledTime;
}
private static void produceSynchronously(final String topic, final List<KeyValueTimestamp<String, String>> toProduce) {

View File

@ -61,7 +61,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
@Timeout(600)
public class TaskMetadataIntegrationTest {
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1, new Properties(), Collections.emptyList(), 0L, 0L);
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1, new Properties(), Collections.emptyMap(), 0L, 0L);
@BeforeAll
public static void startCluster() throws IOException {

View File

@ -16,19 +16,36 @@
*/
package org.apache.kafka.streams.integration.utils;
import kafka.server.KafkaServer;
import kafka.zk.EmbeddedZookeeper;
import kafka.testkit.KafkaClusterTestKit;
import kafka.testkit.TestKitNodes;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.server.config.ConfigType;
import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.apache.kafka.server.config.ZkConfigs;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.storage.internals.log.CleanerConfig;
import org.apache.kafka.test.TestCondition;
@ -37,115 +54,143 @@ import org.apache.kafka.test.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.common.utils.Utils.mkProperties;
/**
* Runs an in-memory, "embedded" Kafka cluster with 1 ZooKeeper instance and supplied number of Kafka brokers.
* Setup an embedded Kafka KRaft cluster for integration tests (using {@link kafka.testkit.KafkaClusterTestKit} internally) with the
* specified number of brokers and the specified broker properties.
* Additional Kafka client properties can also be supplied if required.
* This class also provides various utility methods to easily create Kafka topics, produce data, consume data etc.
*/
public class EmbeddedKafkaCluster {
private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
private static final int DEFAULT_BROKER_PORT = 0; // 0 results in a random port being selected
private static final int TOPIC_CREATION_TIMEOUT = 30000;
private static final int TOPIC_DELETION_TIMEOUT = 30000;
private EmbeddedZookeeper zookeeper = null;
private final KafkaEmbedded[] brokers;
private final KafkaClusterTestKit cluster;
private final Properties brokerConfig;
private final List<Properties> brokerConfigOverrides;
public final MockTime time;
public EmbeddedKafkaCluster(final int numBrokers) {
this(numBrokers, new Properties());
}
public EmbeddedKafkaCluster(final int numBrokers,
final Properties brokerConfig) {
this(numBrokers, brokerConfig, System.currentTimeMillis());
public EmbeddedKafkaCluster(final int numBrokers, final Properties brokerConfig) {
this(numBrokers, brokerConfig, Collections.emptyMap());
}
public EmbeddedKafkaCluster(final int numBrokers,
final Properties brokerConfig,
final long mockTimeMillisStart) {
this(numBrokers, brokerConfig, Collections.emptyList(), mockTimeMillisStart);
this(numBrokers, brokerConfig, Collections.emptyMap(), mockTimeMillisStart, System.nanoTime());
}
public EmbeddedKafkaCluster(final int numBrokers,
final Properties brokerConfig,
final List<Properties> brokerConfigOverrides) {
this(numBrokers, brokerConfig, brokerConfigOverrides, System.currentTimeMillis());
final Map<Integer, Map<String, String>> brokerConfigOverrides) {
this(numBrokers, brokerConfig, brokerConfigOverrides, System.currentTimeMillis(), System.nanoTime());
}
public EmbeddedKafkaCluster(final int numBrokers,
final Properties brokerConfig,
final List<Properties> brokerConfigOverrides,
final long mockTimeMillisStart) {
this(numBrokers, brokerConfig, brokerConfigOverrides, mockTimeMillisStart, System.nanoTime());
}
public EmbeddedKafkaCluster(final int numBrokers,
final Properties brokerConfig,
final List<Properties> brokerConfigOverrides,
final Map<Integer, Map<String, String>> brokerConfigOverrides,
final long mockTimeMillisStart,
final long mockTimeNanoStart) {
addDefaultBrokerPropsIfAbsent(brokerConfig);
if (!brokerConfigOverrides.isEmpty() && brokerConfigOverrides.size() != numBrokers) {
throw new IllegalArgumentException("Size of brokerConfigOverrides " + brokerConfigOverrides.size()
+ " must match broker number " + numBrokers);
+ " must match broker number " + numBrokers);
}
try {
final KafkaClusterTestKit.Builder clusterBuilder = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder()
.setCombined(true)
.setNumBrokerNodes(numBrokers)
.setPerServerProperties(brokerConfigOverrides)
// Reduce number of controllers for faster startup
// We may make this configurable in the future if there's a use case for it
.setNumControllerNodes(1)
.build()
);
brokerConfig.forEach((k, v) -> clusterBuilder.setConfigProp((String) k, v));
cluster = clusterBuilder.build();
cluster.nonFatalFaultHandler().setIgnore(true);
} catch (final Exception e) {
throw new KafkaException("Failed to create test Kafka cluster", e);
}
brokers = new KafkaEmbedded[numBrokers];
this.brokerConfig = brokerConfig;
time = new MockTime(mockTimeMillisStart, mockTimeNanoStart);
this.brokerConfigOverrides = brokerConfigOverrides;
this.time = new MockTime(mockTimeMillisStart, mockTimeNanoStart);
}
public void start() {
try {
cluster.format();
cluster.startup();
cluster.waitForReadyBrokers();
} catch (final Exception e) {
throw new KafkaException("Failed to start test Kafka cluster", e);
}
verifyClusterReadiness();
}
/**
* Creates and starts a Kafka cluster.
* Perform an extended check to ensure that the primary APIs of the cluster are available, including:
* <ul>
* <li>Ability to create a topic</li>
* <li>Ability to produce to a topic</li>
* <li>Ability to form a consumer group</li>
* <li>Ability to consume from a topic</li>
* </ul>
* If this method completes successfully, all resources created to verify the cluster health
* (such as topics and consumer groups) will be cleaned up before it returns.
* <p>
* This provides extra guarantees compared to other cluster readiness checks such as
* {@link KafkaClusterTestKit#waitForReadyBrokers()}, which verify that brokers have
* completed startup and joined the cluster, but do not verify that the internal consumer
* offsets topic has been created or that it's actually possible for users to create and
* interact with topics.
*/
public void start() throws IOException {
log.debug("Initiating embedded Kafka cluster startup");
log.debug("Starting a ZooKeeper instance");
zookeeper = new EmbeddedZookeeper();
log.debug("ZooKeeper instance is running at {}", zKConnectString());
public void verifyClusterReadiness() {
final UUID uuid = UUID.randomUUID();
final String consumerGroupId = "group-warmup-" + uuid;
final Map<String, Object> consumerConfig = Collections.singletonMap(GROUP_ID_CONFIG, consumerGroupId);
final String topic = "topic-warmup-" + uuid;
brokerConfig.put(ZkConfigs.ZK_CONNECT_CONFIG, zKConnectString());
putIfAbsent(brokerConfig, SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://localhost:" + DEFAULT_BROKER_PORT);
putIfAbsent(brokerConfig, ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, true);
putIfAbsent(brokerConfig, CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, 2 * 1024 * 1024L);
putIfAbsent(brokerConfig, GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 0);
putIfAbsent(brokerConfig, GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 0);
putIfAbsent(brokerConfig, GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, (short) 1);
putIfAbsent(brokerConfig, GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, 5);
putIfAbsent(brokerConfig, TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 5);
putIfAbsent(brokerConfig, ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, true);
createTopic(topic);
final Map<String, Object> producerProps = new HashMap<>(clientDefaultConfig());
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "warmup-producer");
produce(producerProps, topic, null, "warmup message key", "warmup message value");
for (int i = 0; i < brokers.length; i++) {
brokerConfig.put(ServerConfigs.BROKER_ID_CONFIG, i);
log.debug("Starting a Kafka instance on {} ...", brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG));
final Properties effectiveConfig = new Properties();
effectiveConfig.putAll(brokerConfig);
if (brokerConfigOverrides != null && brokerConfigOverrides.size() > i) {
effectiveConfig.putAll(brokerConfigOverrides.get(i));
try (Consumer<?, ?> consumer = createConsumerAndSubscribeTo(consumerConfig, topic)) {
final ConsumerRecords<?, ?> records = consumer.poll(Duration.ofMillis(TimeUnit.MINUTES.toMillis(2)));
if (records.isEmpty()) {
throw new AssertionError("Failed to verify availability of group coordinator and produce/consume APIs on Kafka cluster in time");
}
brokers[i] = new KafkaEmbedded(effectiveConfig, time);
log.debug("Kafka instance is running at {}, connected to ZooKeeper at {}",
brokers[i].brokerList(), brokers[i].zookeeperConnect());
}
}
private void putIfAbsent(final Properties props, final String propertyKey, final Object propertyValue) {
if (!props.containsKey(propertyKey)) {
brokerConfig.put(propertyKey, propertyValue);
try (Admin admin = createAdminClient()) {
admin.deleteConsumerGroups(Collections.singleton(consumerGroupId)).all().get(30, TimeUnit.SECONDS);
admin.deleteTopics(Collections.singleton(topic)).all().get(30, TimeUnit.SECONDS);
} catch (final InterruptedException | ExecutionException | TimeoutException e) {
throw new AssertionError("Failed to clean up cluster health check resource(s)", e);
}
}
@ -153,46 +198,22 @@ public class EmbeddedKafkaCluster {
* Stop the Kafka cluster.
*/
public void stop() {
if (brokers.length > 1) {
// delete the topics first to avoid cascading leader elections while shutting down the brokers
final Set<String> topics = getAllTopicsInCluster();
if (!topics.isEmpty()) {
try (final Admin adminClient = brokers[0].createAdminClient()) {
adminClient.deleteTopics(topics).all().get();
} catch (final InterruptedException e) {
log.warn("Got interrupted while deleting topics in preparation for stopping embedded brokers", e);
throw new RuntimeException(e);
} catch (final ExecutionException | RuntimeException e) {
log.warn("Couldn't delete all topics before stopping brokers", e);
}
}
final AtomicReference<Throwable> shutdownFailure = new AtomicReference<>();
Utils.closeQuietly(cluster, "embedded Kafka cluster", shutdownFailure);
if (shutdownFailure.get() != null) {
throw new KafkaException("Failed to shut down producer / embedded Kafka cluster", shutdownFailure.get());
}
for (final KafkaEmbedded broker : brokers) {
broker.stopAsync();
}
for (final KafkaEmbedded broker : brokers) {
broker.awaitStoppedAndPurge();
}
zookeeper.shutdown();
}
/**
* The ZooKeeper connection string aka `zookeeper.connect` in `hostnameOrIp:port` format.
* Example: `127.0.0.1:2181`.
* <p>
* You can use this to e.g. tell Kafka brokers how to connect to this instance.
*/
public String zKConnectString() {
return "127.0.0.1:" + zookeeper.port();
}
/**
* This cluster's `bootstrap.servers` value. Example: `127.0.0.1:9092`.
* <p>
* You can use this to tell Kafka producers how to connect to this cluster.
*/
public String bootstrapServers() {
return brokers[0].brokerList();
return cluster.bootstrapServers();
}
public boolean sslEnabled() {
final String listenerSecurityProtocolMap = brokerConfig.getProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG);
if (listenerSecurityProtocolMap == null)
return false;
return listenerSecurityProtocolMap.contains(":SSL") || listenerSecurityProtocolMap.contains(":SASL_SSL");
}
/**
@ -211,8 +232,18 @@ public class EmbeddedKafkaCluster {
*
* @param topic The name of the topic.
*/
public void createTopic(final String topic) throws InterruptedException {
createTopic(topic, 1, 1, Collections.emptyMap());
public void createTopic(final String topic) {
createTopic(topic, 1);
}
/**
* Create a Kafka topic with given partition and a replication factor of 1.
*
* @param topic The name of the topic.
* @param partitions The number of partitions for this topic.
*/
public void createTopic(final String topic, final int partitions) {
createTopic(topic, partitions, 1, Collections.emptyMap());
}
/**
@ -227,116 +258,177 @@ public class EmbeddedKafkaCluster {
}
/**
* Create a Kafka topic with the given parameters.
* Create a Kafka topic with given partition, replication factor, and topic config.
*
* @param topic The name of the topic.
* @param topic The name of the topic.
* @param partitions The number of partitions for this topic.
* @param replication The replication factor for (partitions of) this topic.
* @param topicConfig Additional topic-level configuration settings.
*/
public void createTopic(final String topic,
final int partitions,
final int replication,
final Map<String, String> topicConfig) throws InterruptedException {
brokers[0].createTopic(topic, partitions, replication, topicConfig);
final List<TopicPartition> topicPartitions = new ArrayList<>();
for (int partition = 0; partition < partitions; partition++) {
topicPartitions.add(new TopicPartition(topic, partition));
public void createTopic(final String topic, final int partitions, final int replication, final Map<String, String> topicConfig) {
if (replication > cluster.brokers().size()) {
throw new InvalidReplicationFactorException("Insufficient brokers ("
+ cluster.brokers().size() + ") for desired replication (" + replication + ")");
}
log.info("Creating topic { name: {}, partitions: {}, replication: {}, config: {} }",
topic, partitions, replication, topicConfig);
final NewTopic newTopic = new NewTopic(topic, partitions, (short) replication);
newTopic.configs(topicConfig);
try (final Admin adminClient = createAdminClient()) {
adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
TestUtils.waitForCondition(() -> adminClient.listTopics().names().get().contains(topic),
"Wait for topic " + topic + " to get created.");
} catch (final TopicExistsException ignored) {
} catch (final InterruptedException | ExecutionException e) {
if (!(e.getCause() instanceof TopicExistsException)) {
throw new RuntimeException(e);
}
}
IntegrationTestUtils.waitForTopicPartitions(brokers(), topicPartitions, TOPIC_CREATION_TIMEOUT);
}
/**
* Deletes a topic returns immediately.
*
* @param topic the name of the topic
*/
public void deleteTopic(final String topic) throws InterruptedException {
deleteTopicsAndWait(-1L, topic);
}
/**
* Deletes a topic and blocks for max 30 sec until the topic got deleted.
*
* @param topic the name of the topic
*/
public void deleteTopicAndWait(final String topic) throws InterruptedException {
deleteTopicsAndWait(TOPIC_DELETION_TIMEOUT, topic);
}
/**
* Deletes multiple topics returns immediately.
*
* @param topics the name of the topics
*/
public void deleteTopics(final String... topics) throws InterruptedException {
deleteTopicsAndWait(-1, topics);
}
/**
* Deletes multiple topics and blocks for max 30 sec until all topics got deleted.
*
* @param topics the name of the topics
*/
public void deleteTopicsAndWait(final String... topics) throws InterruptedException {
deleteTopicsAndWait(TOPIC_DELETION_TIMEOUT, topics);
}
/**
* Deletes multiple topics and blocks until all topics got deleted.
*
* @param timeoutMs the max time to wait for the topics to be deleted (does not block if {@code <= 0})
* @param topics the name of the topics
*/
public void deleteTopicsAndWait(final long timeoutMs, final String... topics) throws InterruptedException {
public void deleteTopics(final String... topics) {
for (final String topic : topics) {
try {
brokers[0].deleteTopic(topic);
} catch (final UnknownTopicOrPartitionException ignored) { }
deleteTopic(topic);
}
}
if (timeoutMs > 0) {
TestUtils.waitForCondition(new TopicsDeletedCondition(topics), timeoutMs, "Topics not deleted after " + timeoutMs + " milli seconds.");
/**
* Delete a Kafka topic.
*
* @param topic the topic to delete; may not be null
*/
public void deleteTopic(final String topic) {
try (final Admin adminClient = createAdminClient()) {
adminClient.deleteTopics(Collections.singleton(topic)).all().get();
} catch (final InterruptedException | ExecutionException e) {
if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) {
throw new RuntimeException(e);
}
}
}
/**
* Deletes all topics and blocks until all topics got deleted.
*
* @param timeoutMs the max time to wait for the topics to be deleted (does not block if {@code <= 0})
* Delete all topics except internal topics.
*/
public void deleteAllTopicsAndWait(final long timeoutMs) throws InterruptedException {
final Set<String> topics = getAllTopicsInCluster();
for (final String topic : topics) {
try {
brokers[0].deleteTopic(topic);
} catch (final UnknownTopicOrPartitionException ignored) { }
public void deleteAllTopics() {
try (final Admin adminClient = createAdminClient()) {
final Set<String> topics = adminClient.listTopics().names().get();
adminClient.deleteTopics(topics).all().get();
} catch (final UnknownTopicOrPartitionException ignored) {
} catch (final ExecutionException | InterruptedException e) {
if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) {
throw new RuntimeException(e);
}
}
}
if (timeoutMs > 0) {
TestUtils.waitForCondition(new TopicsDeletedCondition(topics), timeoutMs, "Topics not deleted after " + timeoutMs + " milli seconds.");
/**
* Produce given key and value to topic partition.
* @param topic the topic to produce to; may not be null.
* @param partition the topic partition to produce to.
* @param key the record key.
* @param value the record value.
*/
public void produce(final Map<String, Object> producerProps, final String topic, final Integer partition, final String key, final String value) {
try (KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps, new ByteArraySerializer(), new ByteArraySerializer())) {
final ProducerRecord<byte[], byte[]> msg = new ProducerRecord<>(topic, partition, key == null ? null : key.getBytes(), value == null ? null : value.getBytes());
try {
producer.send(msg).get(TimeUnit.SECONDS.toMillis(120), TimeUnit.MILLISECONDS);
producer.flush();
} catch (final Exception e) {
throw new KafkaException("Could not produce message: " + msg, e);
}
}
}
public Admin createAdminClient() {
return Admin.create(mkProperties(clientDefaultConfig()));
}
public Map<String, String> clientDefaultConfig() {
final Map<String, String> props = new HashMap<>();
props.putIfAbsent(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
if (sslEnabled()) {
props.putIfAbsent(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG).toString());
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ((Password) brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).value());
props.putIfAbsent(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
}
return props;
}
public KafkaConsumer<byte[], byte[]> createConsumer(final Map<String, Object> consumerProps) {
final Map<String, Object> props = new HashMap<>(clientDefaultConfig());
props.putAll(consumerProps);
props.putIfAbsent(GROUP_ID_CONFIG, UUID.randomUUID().toString());
props.putIfAbsent(ENABLE_AUTO_COMMIT_CONFIG, "false");
props.putIfAbsent(AUTO_OFFSET_RESET_CONFIG, "earliest");
props.putIfAbsent(KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.putIfAbsent(VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
final KafkaConsumer<byte[], byte[]> consumer;
try {
consumer = new KafkaConsumer<>(props);
} catch (final Throwable t) {
throw new KafkaException("Failed to create consumer", t);
}
return consumer;
}
public KafkaConsumer<byte[], byte[]> createConsumerAndSubscribeTo(final Map<String, Object> consumerProps, final String... topics) {
return createConsumerAndSubscribeTo(consumerProps, null, topics);
}
public KafkaConsumer<byte[], byte[]> createConsumerAndSubscribeTo(final Map<String, Object> consumerProps, final ConsumerRebalanceListener rebalanceListener, final String... topics) {
final KafkaConsumer<byte[], byte[]> consumer = createConsumer(consumerProps);
if (rebalanceListener != null) {
consumer.subscribe(Arrays.asList(topics), rebalanceListener);
} else {
consumer.subscribe(Arrays.asList(topics));
}
return consumer;
}
private void addDefaultBrokerPropsIfAbsent(final Properties brokerConfig) {
brokerConfig.putIfAbsent(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, 2 * 1024 * 1024L);
brokerConfig.putIfAbsent(GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, "0");
brokerConfig.putIfAbsent(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, "0");
brokerConfig.putIfAbsent(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, "5");
brokerConfig.putIfAbsent(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
brokerConfig.putIfAbsent(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, "5");
brokerConfig.putIfAbsent(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
brokerConfig.putIfAbsent(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, true);
brokerConfig.putIfAbsent(ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, true);
}
public void waitForRemainingTopics(final long timeoutMs, final String... topics) throws InterruptedException {
TestUtils.waitForCondition(new TopicsRemainingCondition(topics), timeoutMs, "Topics are not expected after " + timeoutMs + " milli seconds.");
}
private final class TopicsDeletedCondition implements TestCondition {
final Set<String> deletedTopics = new HashSet<>();
private TopicsDeletedCondition(final String... topics) {
Collections.addAll(deletedTopics, topics);
public Set<String> getAllTopicsInCluster() {
try (final Admin adminClient = createAdminClient()) {
return adminClient.listTopics(new ListTopicsOptions().listInternal(true)).names().get();
} catch (final InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
private TopicsDeletedCondition(final Collection<String> topics) {
deletedTopics.addAll(topics);
}
@Override
public boolean conditionMet() {
final Set<String> allTopics = getAllTopicsInCluster();
return !allTopics.removeAll(deletedTopics);
public Properties getLogConfig(final String topic) {
try (final Admin adminClient = createAdminClient()) {
final ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
final Config config = adminClient.describeConfigs(Collections.singleton(configResource)).values().get(configResource).get();
final Properties properties = new Properties();
for (final ConfigEntry configEntry : config.entries()) {
if (configEntry.source() == ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG) {
properties.put(configEntry.name(), configEntry.value());
}
}
return properties;
} catch (final InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
@ -353,25 +445,4 @@ public class EmbeddedKafkaCluster {
return allTopics.equals(remainingTopics);
}
}
private List<KafkaServer> brokers() {
final List<KafkaServer> servers = new ArrayList<>();
for (final KafkaEmbedded broker : brokers) {
servers.add(broker.kafkaServer());
}
return servers;
}
public Properties getLogConfig(final String topic) {
return brokers[0].kafkaServer().zkClient().getEntityConfigs(ConfigType.TOPIC, topic);
}
public Set<String> getAllTopicsInCluster() {
final scala.collection.Iterator<String> topicsIterator = brokers[0].kafkaServer().zkClient().getAllTopicsInCluster(false).iterator();
final Set<String> topics = new HashSet<>();
while (topicsIterator.hasNext()) {
topics.add(topicsIterator.next());
}
return topics;
}
}

View File

@ -75,6 +75,7 @@ import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
@ -104,7 +105,6 @@ import static org.apache.kafka.common.utils.Utils.sleep;
import static org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.fail;
@ -294,7 +294,7 @@ public class IntegrationTestUtils {
final int replicationCount,
final String... topics) {
try {
cluster.deleteAllTopicsAndWait(DEFAULT_TIMEOUT);
cluster.deleteAllTopics();
for (final String topic : topics) {
cluster.createTopic(topic, partitionCount, replicationCount);
}
@ -306,9 +306,9 @@ public class IntegrationTestUtils {
public static void quietlyCleanStateAfterTest(final EmbeddedKafkaCluster cluster, final KafkaStreams driver) {
try {
driver.cleanUp();
cluster.deleteAllTopicsAndWait(DEFAULT_TIMEOUT);
} catch (final RuntimeException | InterruptedException e) {
LOG.warn("Ignoring failure to clean test state", e);
cluster.deleteAllTopics();
} catch (final RuntimeException e) {
LOG.warn("Ignoring failure to clean test state");
}
}
@ -1167,6 +1167,10 @@ public class IntegrationTestUtils {
if (results.size() != expected.size()) {
throw new AssertionError(printRecords(results) + " != " + expected);
}
// sort expected and results by key before comparing them
expected.sort(Comparator.comparing(e -> e.key().toString()));
results.sort(Comparator.comparing(e -> e.key().toString()));
final Iterator<KeyValueTimestamp<K, V>> expectedIterator = expected.iterator();
for (final ConsumerRecord<K, V> result : results) {
final KeyValueTimestamp<K, V> expected1 = expectedIterator.next();
@ -1178,28 +1182,6 @@ public class IntegrationTestUtils {
}
}
public static void verifyKeyValueTimestamps(final Properties consumerConfig,
final String topic,
final Set<KeyValueTimestamp<String, Long>> expected) {
final List<ConsumerRecord<String, Long>> results;
try {
results = waitUntilMinRecordsReceived(consumerConfig, topic, expected.size());
} catch (final Exception e) {
throw new RuntimeException(e);
}
if (results.size() != expected.size()) {
throw new AssertionError(printRecords(results) + " != " + expected);
}
final Set<KeyValueTimestamp<String, Long>> actual =
results.stream()
.map(result -> new KeyValueTimestamp<>(result.key(), result.value(), result.timestamp()))
.collect(Collectors.toSet());
assertThat(actual, equalTo(expected));
}
private static <K, V> void compareKeyValueTimestamp(final ConsumerRecord<K, V> record,
final K expectedKey,
final V expectedValue,

View File

@ -1,223 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.integration.utils;
import kafka.cluster.EndPoint;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.TestUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.server.config.ZkConfigs;
import org.apache.kafka.server.util.MockTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import static org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG;
import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG;
import static org.apache.kafka.server.config.ServerLogConfigs.NUM_PARTITIONS_CONFIG;
/**
* Runs an in-memory, "embedded" instance of a Kafka broker, which listens at `127.0.0.1:9092` by
* default.
* <p>
* Requires a running ZooKeeper instance to connect to.
*/
public class KafkaEmbedded {
private static final Logger log = LoggerFactory.getLogger(KafkaEmbedded.class);
private static final String DEFAULT_ZK_CONNECT = "127.0.0.1:2181";
private final Properties effectiveConfig;
private final File logDir;
private final File tmpFolder;
private final KafkaServer kafka;
/**
* Creates and starts an embedded Kafka broker.
*
* @param config Broker configuration settings. Used to modify, for example, on which port the
* broker should listen to. Note that you cannot change the `log.dirs` setting
* currently.
*/
@SuppressWarnings({"WeakerAccess", "this-escape"})
public KafkaEmbedded(final Properties config, final MockTime time) throws IOException {
tmpFolder = org.apache.kafka.test.TestUtils.tempDirectory();
logDir = org.apache.kafka.test.TestUtils.tempDirectory(tmpFolder.toPath(), "log");
effectiveConfig = effectiveConfigFrom(config);
final boolean loggingEnabled = true;
final KafkaConfig kafkaConfig = new KafkaConfig(effectiveConfig, loggingEnabled);
log.debug("Starting embedded Kafka broker (with log.dirs={} and ZK ensemble at {}) ...",
logDir, zookeeperConnect());
kafka = TestUtils.createServer(kafkaConfig, time);
log.debug("Startup of embedded Kafka broker at {} completed (with ZK ensemble at {}) ...",
brokerList(), zookeeperConnect());
}
/**
* Creates the configuration for starting the Kafka broker by merging default values with
* overwrites.
*
* @param initialConfig Broker configuration settings that override the default config.
*/
private Properties effectiveConfigFrom(final Properties initialConfig) {
final Properties effectiveConfig = new Properties();
effectiveConfig.put(ServerConfigs.BROKER_ID_CONFIG, 0);
effectiveConfig.put(NUM_PARTITIONS_CONFIG, 1);
effectiveConfig.put(AUTO_CREATE_TOPICS_ENABLE_CONFIG, true);
effectiveConfig.put(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG, 1000000);
effectiveConfig.put(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, true);
effectiveConfig.put(ZkConfigs.ZK_SESSION_TIMEOUT_MS_CONFIG, 10000);
effectiveConfig.putAll(initialConfig);
effectiveConfig.setProperty(LOG_DIR_CONFIG, logDir.getAbsolutePath());
return effectiveConfig;
}
/**
* This broker's `metadata.broker.list` value. Example: `localhost:9092`.
* <p>
* You can use this to tell Kafka producers and consumers how to connect to this instance.
*/
@SuppressWarnings("WeakerAccess")
public String brokerList() {
final EndPoint endPoint = kafka.advertisedListeners().head();
return endPoint.host() + ":" + endPoint.port();
}
/**
* The ZooKeeper connection string aka `zookeeper.connect`.
*/
@SuppressWarnings("WeakerAccess")
public String zookeeperConnect() {
return effectiveConfig.getProperty("zookeeper.connect", DEFAULT_ZK_CONNECT);
}
@SuppressWarnings("WeakerAccess")
public void stopAsync() {
log.debug("Shutting down embedded Kafka broker at {} (with ZK ensemble at {}) ...",
brokerList(), zookeeperConnect());
kafka.shutdown();
}
@SuppressWarnings("WeakerAccess")
public void awaitStoppedAndPurge() {
kafka.awaitShutdown();
log.debug("Removing log dir at {} ...", logDir);
try {
Utils.delete(tmpFolder);
} catch (final IOException e) {
throw new RuntimeException(e);
}
log.debug("Shutdown of embedded Kafka broker at {} completed (with ZK ensemble at {}) ...",
brokerList(), zookeeperConnect());
}
/**
* Create a Kafka topic with 1 partition and a replication factor of 1.
*
* @param topic The name of the topic.
*/
public void createTopic(final String topic) {
createTopic(topic, 1, 1, Collections.emptyMap());
}
/**
* Create a Kafka topic with the given parameters.
*
* @param topic The name of the topic.
* @param partitions The number of partitions for this topic.
* @param replication The replication factor for (the partitions of) this topic.
*/
public void createTopic(final String topic, final int partitions, final int replication) {
createTopic(topic, partitions, replication, Collections.emptyMap());
}
/**
* Create a Kafka topic with the given parameters.
*
* @param topic The name of the topic.
* @param partitions The number of partitions for this topic.
* @param replication The replication factor for (partitions of) this topic.
* @param topicConfig Additional topic-level configuration settings.
*/
public void createTopic(final String topic,
final int partitions,
final int replication,
final Map<String, String> topicConfig) {
log.debug("Creating topic { name: {}, partitions: {}, replication: {}, config: {} }",
topic, partitions, replication, topicConfig);
final NewTopic newTopic = new NewTopic(topic, partitions, (short) replication);
newTopic.configs(topicConfig);
try (final Admin adminClient = createAdminClient()) {
adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
} catch (final InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
@SuppressWarnings("WeakerAccess")
public Admin createAdminClient() {
final Properties adminClientConfig = new Properties();
adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList());
final Object listeners = effectiveConfig.get(SocketServerConfigs.LISTENERS_CONFIG);
if (listeners != null && listeners.toString().contains("SSL")) {
adminClientConfig.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, effectiveConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
adminClientConfig.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ((Password) effectiveConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).value());
adminClientConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
}
return Admin.create(adminClientConfig);
}
@SuppressWarnings("WeakerAccess")
public void deleteTopic(final String topic) {
log.debug("Deleting topic { name: {} }", topic);
try (final Admin adminClient = createAdminClient()) {
adminClient.deleteTopics(Collections.singletonList(topic)).all().get();
} catch (final InterruptedException | ExecutionException e) {
if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) {
throw new RuntimeException(e);
}
}
}
@SuppressWarnings("WeakerAccess")
public KafkaServer kafkaServer() {
return kafka;
}
}

View File

@ -114,7 +114,7 @@ public class HandlingSourceTopicDeletionIntegrationTest {
() -> "Kafka Streams clients did not reach state RUNNING"
);
CLUSTER.deleteTopicAndWait(INPUT_TOPIC);
CLUSTER.deleteTopic(INPUT_TOPIC);
TestUtils.waitForCondition(
() -> kafkaStreams1.state() == State.ERROR && kafkaStreams2.state() == State.ERROR,