KAFKA-9831: increase max.poll.interval.ms to avoid unexpected rebalance (#10301)

Reviewers: Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Luke Chen 2021-04-10 03:19:14 +08:00 committed by GitHub
parent c9edb8ad84
commit f76b8e4938
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 172 additions and 52 deletions

View File

@ -193,7 +193,7 @@
files="(StreamsPartitionAssignorTest|StreamThreadTest|StreamTaskTest|TopologyTestDriverTest).java"/>
<suppress checks="MethodLength"
files="(EosBetaUpgradeIntegrationTest|KStreamKStreamJoinTest|RocksDBWindowStoreTest).java"/>
files="(EosIntegrationTest|EosBetaUpgradeIntegrationTest|KStreamKStreamJoinTest|RocksDBWindowStoreTest).java"/>
<suppress checks="ClassDataAbstractionCoupling"
files=".*[/\\]streams[/\\].*test[/\\].*.java"/>

View File

@ -84,10 +84,10 @@ import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.wa
import static org.apache.kafka.test.StreamsTestUtils.startKafkaStreamsAndWaitForRunningState;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@RunWith(Parameterized.class)
@Category({IntegrationTest.class})
@ -133,6 +133,8 @@ public class EosIntegrationTest {
private static final AtomicInteger TEST_NUMBER = new AtomicInteger(0);
private volatile boolean hasUnexpectedError = false;
@Parameters(name = "{0}")
public static Collection<String[]> data() {
return Arrays.asList(new String[][] {
@ -279,18 +281,20 @@ public class EosIntegrationTest {
inputData.size()
);
checkResultPerKey(committedRecords, inputData);
checkResultPerKey(committedRecords, inputData, "The committed records do not match what expected");
}
}
}
private void checkResultPerKey(final List<KeyValue<Long, Long>> result, final List<KeyValue<Long, Long>> expectedResult) {
private void checkResultPerKey(final List<KeyValue<Long, Long>> result,
final List<KeyValue<Long, Long>> expectedResult,
final String reason) {
final Set<Long> allKeys = new HashSet<>();
addAllKeys(allKeys, result);
addAllKeys(allKeys, expectedResult);
for (final Long key : allKeys) {
assertThat(getAllRecordPerKey(key, result), equalTo(getAllRecordPerKey(key, expectedResult)));
assertThat(reason, getAllRecordPerKey(key, result), equalTo(getAllRecordPerKey(key, expectedResult)));
}
}
@ -391,19 +395,21 @@ public class EosIntegrationTest {
public void shouldNotViolateEosIfOneTaskFails() throws Exception {
// this test writes 10 + 5 + 5 records per partition (running with 2 partitions)
// the app is supposed to copy all 40 records into the output topic
// the app commits after each 10 records per partition, and thus will have 2*5 uncommitted writes
//
// the app first commits after each 10 records per partition(total 20 records), and thus will have 2 * 5 uncommitted writes
//
// the failure gets inject after 20 committed and 30 uncommitted records got received
// -> the failure only kills one thread
// after fail over, we should read 40 committed records (even if 50 record got written)
try (final KafkaStreams streams = getKafkaStreams("dummy", false, "appDir", 2, eosConfig)) {
try (final KafkaStreams streams = getKafkaStreams("dummy", false, "appDir", 2, eosConfig, MAX_POLL_INTERVAL_MS)) {
startKafkaStreamsAndWaitForRunningState(streams, MAX_WAIT_TIME_MS);
final List<KeyValue<Long, Long>> committedDataBeforeFailure = prepareData(0L, 10L, 0L, 1L);
final List<KeyValue<Long, Long>> uncommittedDataBeforeFailure = prepareData(10L, 15L, 0L, 1L);
final List<KeyValue<Long, Long>> dataBeforeFailure = new ArrayList<>();
final List<KeyValue<Long, Long>> dataBeforeFailure = new ArrayList<>(
committedDataBeforeFailure.size() + uncommittedDataBeforeFailure.size());
dataBeforeFailure.addAll(committedDataBeforeFailure);
dataBeforeFailure.addAll(uncommittedDataBeforeFailure);
@ -415,13 +421,29 @@ public class EosIntegrationTest {
() -> commitRequested.get() == 2, MAX_WAIT_TIME_MS,
"StreamsTasks did not request commit.");
// expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes):
//
// p-0: ---> 10 rec + C
// p-1: ---> 10 rec + C
final List<KeyValue<Long, Long>> committedRecords = readResult(committedDataBeforeFailure.size(), CONSUMER_GROUP_ID);
checkResultPerKey(
committedRecords,
committedDataBeforeFailure,
"The committed records before failure do not match what expected");
writeInputData(uncommittedDataBeforeFailure);
final List<KeyValue<Long, Long>> uncommittedRecords = readResult(dataBeforeFailure.size(), null);
final List<KeyValue<Long, Long>> committedRecords = readResult(committedDataBeforeFailure.size(), CONSUMER_GROUP_ID);
// expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes):
//
// p-0: ---> 10 rec + C + 5 rec (pending)
// p-1: ---> 10 rec + C + 5 rec (pending)
checkResultPerKey(committedRecords, committedDataBeforeFailure);
checkResultPerKey(uncommittedRecords, dataBeforeFailure);
final List<KeyValue<Long, Long>> uncommittedRecords = readResult(dataBeforeFailure.size(), null);
checkResultPerKey(
uncommittedRecords,
dataBeforeFailure,
"The uncommitted records before failure do not match what expected");
errorInjected.set(true);
writeInputData(dataAfterFailure);
@ -430,6 +452,11 @@ public class EosIntegrationTest {
() -> uncaughtException != null, MAX_WAIT_TIME_MS,
"Should receive uncaught exception from one StreamThread.");
// expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes):
//
// p-0: ---> 10 rec + C + 5 rec + C + 5 rec + C
// p-1: ---> 10 rec + C + 5 rec + C + 5 rec + C
final List<KeyValue<Long, Long>> allCommittedRecords = readResult(
committedDataBeforeFailure.size() + uncommittedDataBeforeFailure.size() + dataAfterFailure.size(),
CONSUMER_GROUP_ID + "_ALL");
@ -438,17 +465,28 @@ public class EosIntegrationTest {
uncommittedDataBeforeFailure.size() + dataAfterFailure.size(),
CONSUMER_GROUP_ID);
final List<KeyValue<Long, Long>> allExpectedCommittedRecordsAfterRecovery = new ArrayList<>();
final int allCommittedRecordsAfterRecoverySize = committedDataBeforeFailure.size() +
uncommittedDataBeforeFailure.size() + dataAfterFailure.size();
final List<KeyValue<Long, Long>> allExpectedCommittedRecordsAfterRecovery = new ArrayList<>(allCommittedRecordsAfterRecoverySize);
allExpectedCommittedRecordsAfterRecovery.addAll(committedDataBeforeFailure);
allExpectedCommittedRecordsAfterRecovery.addAll(uncommittedDataBeforeFailure);
allExpectedCommittedRecordsAfterRecovery.addAll(dataAfterFailure);
final List<KeyValue<Long, Long>> expectedCommittedRecordsAfterRecovery = new ArrayList<>();
final int committedRecordsAfterRecoverySize = uncommittedDataBeforeFailure.size() + dataAfterFailure.size();
final List<KeyValue<Long, Long>> expectedCommittedRecordsAfterRecovery = new ArrayList<>(committedRecordsAfterRecoverySize);
expectedCommittedRecordsAfterRecovery.addAll(uncommittedDataBeforeFailure);
expectedCommittedRecordsAfterRecovery.addAll(dataAfterFailure);
checkResultPerKey(allCommittedRecords, allExpectedCommittedRecordsAfterRecovery);
checkResultPerKey(committedRecordsAfterFailure, expectedCommittedRecordsAfterRecovery);
checkResultPerKey(
allCommittedRecords,
allExpectedCommittedRecordsAfterRecovery,
"The all committed records after recovery do not match what expected");
checkResultPerKey(
committedRecordsAfterFailure,
expectedCommittedRecordsAfterRecovery,
"The committed records after recovery do not match what expected");
assertThat("Should only get one uncaught exception from Streams.", hasUnexpectedError, is(false));
}
}
@ -456,22 +494,26 @@ public class EosIntegrationTest {
public void shouldNotViolateEosIfOneTaskFailsWithState() throws Exception {
// this test updates a store with 10 + 5 + 5 records per partition (running with 2 partitions)
// the app is supposed to emit all 40 update records into the output topic
// the app commits after each 10 records per partition, and thus will have 2*5 uncommitted writes
// and store updates (ie, another 5 uncommitted writes to a changelog topic per partition)
// in the uncommitted batch sending some data for the new key to validate that upon resuming they will not be shown up in the store
//
// the failure gets inject after 20 committed and 10 uncommitted records got received
// the app first commits after each 10 records per partition (total 20 records), and thus will have 2 * 5 uncommitted writes
// and store updates (ie, another 5 uncommitted writes to a changelog topic per partition)
// in the uncommitted batch, sending some data for the new key to validate that upon resuming they will not be shown up in the store
//
// the failure gets inject after 20 committed and 30 uncommitted records got received
// -> the failure only kills one thread
// after fail over, we should read 40 committed records and the state stores should contain the correct sums
// per key (even if some records got processed twice)
try (final KafkaStreams streams = getKafkaStreams("dummy", true, "appDir", 2, eosConfig)) {
// We need more processing time under "with state" situation, so increasing the max.poll.interval.ms
// to avoid unexpected rebalance during test, which will cause unexpected fail over triggered
try (final KafkaStreams streams = getKafkaStreams("dummy", true, "appDir", 2, eosConfig, 3 * MAX_POLL_INTERVAL_MS)) {
startKafkaStreamsAndWaitForRunningState(streams, MAX_WAIT_TIME_MS);
final List<KeyValue<Long, Long>> committedDataBeforeFailure = prepareData(0L, 10L, 0L, 1L);
final List<KeyValue<Long, Long>> uncommittedDataBeforeFailure = prepareData(10L, 15L, 0L, 1L, 2L, 3L);
final List<KeyValue<Long, Long>> dataBeforeFailure = new ArrayList<>();
final List<KeyValue<Long, Long>> dataBeforeFailure = new ArrayList<>(
committedDataBeforeFailure.size() + uncommittedDataBeforeFailure.size());
dataBeforeFailure.addAll(committedDataBeforeFailure);
dataBeforeFailure.addAll(uncommittedDataBeforeFailure);
@ -483,15 +525,36 @@ public class EosIntegrationTest {
() -> commitRequested.get() == 2, MAX_WAIT_TIME_MS,
"SteamsTasks did not request commit.");
// expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes):
//
// p-0: ---> 10 rec + C
// p-1: ---> 10 rec + C
final List<KeyValue<Long, Long>> committedRecords = readResult(committedDataBeforeFailure.size(), CONSUMER_GROUP_ID);
checkResultPerKey(
committedRecords,
computeExpectedResult(committedDataBeforeFailure),
"The committed records before failure do not match what expected");
writeInputData(uncommittedDataBeforeFailure);
final List<KeyValue<Long, Long>> uncommittedRecords = readResult(dataBeforeFailure.size(), null);
final List<KeyValue<Long, Long>> committedRecords = readResult(committedDataBeforeFailure.size(), CONSUMER_GROUP_ID);
// expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes):
//
// p-0: ---> 10 rec + C + 5 rec (pending)
// p-1: ---> 10 rec + C + 5 rec (pending)
final List<KeyValue<Long, Long>> uncommittedRecords = readResult(dataBeforeFailure.size(), null);
final List<KeyValue<Long, Long>> expectedResultBeforeFailure = computeExpectedResult(dataBeforeFailure);
checkResultPerKey(committedRecords, computeExpectedResult(committedDataBeforeFailure));
checkResultPerKey(uncommittedRecords, expectedResultBeforeFailure);
verifyStateStore(streams, getMaxPerKey(expectedResultBeforeFailure));
checkResultPerKey(
uncommittedRecords,
expectedResultBeforeFailure,
"The uncommitted records before failure do not match what expected");
verifyStateStore(
streams,
getMaxPerKey(expectedResultBeforeFailure),
"The state store content before failure do not match what expected");
errorInjected.set(true);
writeInputData(dataAfterFailure);
@ -500,6 +563,11 @@ public class EosIntegrationTest {
() -> uncaughtException != null, MAX_WAIT_TIME_MS,
"Should receive uncaught exception from one StreamThread.");
// expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes):
//
// p-0: ---> 10 rec + C + 5 rec + C + 5 rec + C
// p-1: ---> 10 rec + C + 5 rec + C + 5 rec + C
final List<KeyValue<Long, Long>> allCommittedRecords = readResult(
committedDataBeforeFailure.size() + uncommittedDataBeforeFailure.size() + dataAfterFailure.size(),
CONSUMER_GROUP_ID + "_ALL");
@ -508,19 +576,31 @@ public class EosIntegrationTest {
uncommittedDataBeforeFailure.size() + dataAfterFailure.size(),
CONSUMER_GROUP_ID);
final List<KeyValue<Long, Long>> allExpectedCommittedRecordsAfterRecovery = new ArrayList<>();
final int allCommittedRecordsAfterRecoverySize = committedDataBeforeFailure.size() +
uncommittedDataBeforeFailure.size() + dataAfterFailure.size();
final List<KeyValue<Long, Long>> allExpectedCommittedRecordsAfterRecovery = new ArrayList<>(allCommittedRecordsAfterRecoverySize);
allExpectedCommittedRecordsAfterRecovery.addAll(committedDataBeforeFailure);
allExpectedCommittedRecordsAfterRecovery.addAll(uncommittedDataBeforeFailure);
allExpectedCommittedRecordsAfterRecovery.addAll(dataAfterFailure);
final List<KeyValue<Long, Long>> expectedResult = computeExpectedResult(allExpectedCommittedRecordsAfterRecovery);
checkResultPerKey(allCommittedRecords, expectedResult);
checkResultPerKey(
allCommittedRecords,
expectedResult,
"The all committed records after recovery do not match what expected");
checkResultPerKey(
committedRecordsAfterFailure,
expectedResult.subList(committedDataBeforeFailure.size(), expectedResult.size()));
expectedResult.subList(committedDataBeforeFailure.size(), expectedResult.size()),
"The committed records after recovery do not match what expected");
verifyStateStore(streams, getMaxPerKey(expectedResult));
verifyStateStore(
streams,
getMaxPerKey(expectedResult),
"The state store content after recovery do not match what expected");
assertThat("Should only get one uncaught exception from Streams.", hasUnexpectedError, is(false));
}
}
@ -528,9 +608,10 @@ public class EosIntegrationTest {
public void shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances() throws Exception {
// this test writes 10 + 5 + 5 + 10 records per partition (running with 2 partitions)
// the app is supposed to copy all 60 records into the output topic
// the app commits after each 10 records per partition, and thus will have 2*5 uncommitted writes
//
// a stall gets injected after 20 committed and 30 uncommitted records got received
// the app first commits after each 10 records per partition, and thus will have 2 * 5 uncommitted writes
//
// Then, a stall gets injected after 20 committed and 30 uncommitted records got received
// -> the stall only affects one thread and should trigger a rebalance
// after rebalancing, we should read 40 committed records (even if 50 record got written)
//
@ -538,8 +619,8 @@ public class EosIntegrationTest {
// we write the remaining 20 records and verify to read 60 result records
try (
final KafkaStreams streams1 = getKafkaStreams("streams1", false, "appDir1", 1, eosConfig);
final KafkaStreams streams2 = getKafkaStreams("streams2", false, "appDir2", 1, eosConfig)
final KafkaStreams streams1 = getKafkaStreams("streams1", false, "appDir1", 1, eosConfig, MAX_POLL_INTERVAL_MS);
final KafkaStreams streams2 = getKafkaStreams("streams2", false, "appDir2", 1, eosConfig, MAX_POLL_INTERVAL_MS)
) {
startKafkaStreamsAndWaitForRunningState(streams1, MAX_WAIT_TIME_MS);
startKafkaStreamsAndWaitForRunningState(streams2, MAX_WAIT_TIME_MS);
@ -547,7 +628,8 @@ public class EosIntegrationTest {
final List<KeyValue<Long, Long>> committedDataBeforeStall = prepareData(0L, 10L, 0L, 1L);
final List<KeyValue<Long, Long>> uncommittedDataBeforeStall = prepareData(10L, 15L, 0L, 1L);
final List<KeyValue<Long, Long>> dataBeforeStall = new ArrayList<>();
final List<KeyValue<Long, Long>> dataBeforeStall = new ArrayList<>(
committedDataBeforeStall.size() + uncommittedDataBeforeStall.size());
dataBeforeStall.addAll(committedDataBeforeStall);
dataBeforeStall.addAll(uncommittedDataBeforeStall);
@ -561,13 +643,29 @@ public class EosIntegrationTest {
() -> commitRequested.get() == 2, MAX_WAIT_TIME_MS,
"SteamsTasks did not request commit.");
// expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes):
//
// p-0: ---> 10 rec + C
// p-1: ---> 10 rec + C
final List<KeyValue<Long, Long>> committedRecords = readResult(committedDataBeforeStall.size(), CONSUMER_GROUP_ID);
checkResultPerKey(
committedRecords,
committedDataBeforeStall,
"The committed records before stall do not match what expected");
writeInputData(uncommittedDataBeforeStall);
final List<KeyValue<Long, Long>> uncommittedRecords = readResult(dataBeforeStall.size(), null);
final List<KeyValue<Long, Long>> committedRecords = readResult(committedDataBeforeStall.size(), CONSUMER_GROUP_ID);
// expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes):
//
// p-0: ---> 10 rec + C + 5 rec (pending)
// p-1: ---> 10 rec + C + 5 rec (pending)
checkResultPerKey(committedRecords, committedDataBeforeStall);
checkResultPerKey(uncommittedRecords, dataBeforeStall);
final List<KeyValue<Long, Long>> uncommittedRecords = readResult(dataBeforeStall.size(), null);
checkResultPerKey(
uncommittedRecords,
dataBeforeStall,
"The uncommitted records before stall do not match what expected");
LOG.info("Injecting Stall");
stallInjected.set(true);
@ -603,15 +701,24 @@ public class EosIntegrationTest {
"Streams1[" + streams1.allMetadata() + "]\n" +
"Streams2[" + streams2.allMetadata() + "]");
// expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes):
//
// p-0: ---> 10 rec + C + 5 rec + C + 5 rec + C
// p-1: ---> 10 rec + C + 5 rec + C + 5 rec + C
final List<KeyValue<Long, Long>> committedRecordsAfterRebalance = readResult(
uncommittedDataBeforeStall.size() + dataToTriggerFirstRebalance.size(),
CONSUMER_GROUP_ID);
final List<KeyValue<Long, Long>> expectedCommittedRecordsAfterRebalance = new ArrayList<>();
final List<KeyValue<Long, Long>> expectedCommittedRecordsAfterRebalance = new ArrayList<>(
uncommittedDataBeforeStall.size() + dataToTriggerFirstRebalance.size());
expectedCommittedRecordsAfterRebalance.addAll(uncommittedDataBeforeStall);
expectedCommittedRecordsAfterRebalance.addAll(dataToTriggerFirstRebalance);
checkResultPerKey(committedRecordsAfterRebalance, expectedCommittedRecordsAfterRebalance);
checkResultPerKey(
committedRecordsAfterRebalance,
expectedCommittedRecordsAfterRebalance,
"The all committed records after rebalance do not match what expected");
LOG.info("Releasing Stall");
doStall = false;
@ -630,25 +737,36 @@ public class EosIntegrationTest {
writeInputData(dataAfterSecondRebalance);
// expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes):
//
// p-0: ---> 10 rec + C + 5 rec + C + 5 rec + C + 10 rec + C
// p-1: ---> 10 rec + C + 5 rec + C + 5 rec + C + 10 rec + C
final List<KeyValue<Long, Long>> allCommittedRecords = readResult(
committedDataBeforeStall.size() + uncommittedDataBeforeStall.size()
+ dataToTriggerFirstRebalance.size() + dataAfterSecondRebalance.size(),
CONSUMER_GROUP_ID + "_ALL");
final List<KeyValue<Long, Long>> allExpectedCommittedRecordsAfterRecovery = new ArrayList<>();
final int allCommittedRecordsAfterRecoverySize = committedDataBeforeStall.size() +
uncommittedDataBeforeStall.size() + dataToTriggerFirstRebalance.size() + dataAfterSecondRebalance.size();
final List<KeyValue<Long, Long>> allExpectedCommittedRecordsAfterRecovery = new ArrayList<>(allCommittedRecordsAfterRecoverySize);
allExpectedCommittedRecordsAfterRecovery.addAll(committedDataBeforeStall);
allExpectedCommittedRecordsAfterRecovery.addAll(uncommittedDataBeforeStall);
allExpectedCommittedRecordsAfterRecovery.addAll(dataToTriggerFirstRebalance);
allExpectedCommittedRecordsAfterRecovery.addAll(dataAfterSecondRebalance);
checkResultPerKey(allCommittedRecords, allExpectedCommittedRecordsAfterRecovery);
checkResultPerKey(
allCommittedRecords,
allExpectedCommittedRecordsAfterRecovery,
"The all committed records after recovery do not match what expected");
}
}
private List<KeyValue<Long, Long>> prepareData(final long fromInclusive,
final long toExclusive,
final Long... keys) {
final List<KeyValue<Long, Long>> data = new ArrayList<>();
final Long dataSize = keys.length * (toExclusive - fromInclusive);
final List<KeyValue<Long, Long>> data = new ArrayList<>(dataSize.intValue());
for (final Long k : keys) {
for (long v = fromInclusive; v < toExclusive; ++v) {
@ -664,7 +782,8 @@ public class EosIntegrationTest {
final boolean withState,
final String appDir,
final int numberOfStreamsThreads,
final String eosConfig) {
final String eosConfig,
final int maxPollIntervalMs) {
commitRequested = new AtomicInteger(0);
errorInjected = new AtomicBoolean(false);
stallInjected = new AtomicBoolean(false);
@ -761,7 +880,7 @@ public class EosIntegrationTest {
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), 5 * 1000);
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), 5 * 1000 - 1);
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), MAX_POLL_INTERVAL_MS);
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), maxPollIntervalMs);
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
properties.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath() + File.separator + appDir);
properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, dummyHostName + ":2142");
@ -776,9 +895,9 @@ public class EosIntegrationTest {
final KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.setUncaughtExceptionHandler((t, e) -> {
if (uncaughtException != null) {
if (uncaughtException != null || !e.getMessage().contains("Injected test exception")) {
e.printStackTrace(System.err);
fail("Should only get one uncaught exception from Streams.");
hasUnexpectedError = true;
}
uncaughtException = e;
});
@ -860,16 +979,17 @@ public class EosIntegrationTest {
}
private void verifyStateStore(final KafkaStreams streams,
final Set<KeyValue<Long, Long>> expectedStoreContent) throws Exception {
final Set<KeyValue<Long, Long>> expectedStoreContent,
final String reason) throws Exception {
final ReadOnlyKeyValueStore<Long, Long> store = IntegrationTestUtils
.getStore(300_000L, storeName, streams, QueryableStoreTypes.keyValueStore());
assertNotNull(store);
final KeyValueIterator<Long, Long> it = store.all();
while (it.hasNext()) {
assertTrue(expectedStoreContent.remove(it.next()));
assertTrue(reason, expectedStoreContent.remove(it.next()));
}
assertTrue(expectedStoreContent.isEmpty());
assertTrue(reason, expectedStoreContent.isEmpty());
}
}