MINOR: update EosV2UpgradeIntegrationTest (#16698)

Refactor test to move off deprecated `transform()` in favor of
`process()`.

Reviewers: Bill Bejeck <bill@confluent.io>
This commit is contained in:
Matthias J. Sax 2024-07-31 11:26:25 -07:00 committed by GitHub
parent 1528264f02
commit d6a41ac3ca
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 48 additions and 53 deletions

View File

@ -40,9 +40,9 @@ import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils.StableAssignmentListener;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
@ -847,7 +847,6 @@ public class EosV2UpgradeIntegrationTest {
}
}
@SuppressWarnings("deprecation")
private KafkaStreams getKafkaStreams(final String appDir,
final String processingGuarantee,
final boolean injectError) {
@ -861,62 +860,58 @@ public class EosV2UpgradeIntegrationTest {
builder.addStateStore(storeBuilder);
final KStream<Long, Long> input = builder.stream(MULTI_PARTITION_INPUT_TOPIC);
input.transform(new TransformerSupplier<Long, Long, KeyValue<Long, Long>>() {
@Override
public Transformer<Long, Long, KeyValue<Long, Long>> get() {
return new Transformer<Long, Long, KeyValue<Long, Long>>() {
ProcessorContext context;
KeyValueStore<Long, Long> state = null;
AtomicBoolean crash;
AtomicInteger sharedCommit;
input.process(() -> new Processor<Long, Long, Long, Long>() {
ProcessorContext<Long, Long> context;
KeyValueStore<Long, Long> state = null;
AtomicBoolean crash;
AtomicInteger sharedCommit;
@Override
public void init(final ProcessorContext context) {
this.context = context;
state = context.getStateStore(storeName);
final String clientId = context.appConfigs().get(StreamsConfig.CLIENT_ID_CONFIG).toString();
if (APP_DIR_1.equals(clientId)) {
crash = errorInjectedClient1;
sharedCommit = commitCounterClient1;
} else {
crash = errorInjectedClient2;
sharedCommit = commitCounterClient2;
@Override
public void init(final ProcessorContext<Long, Long> context) {
this.context = context;
state = context.getStateStore(storeName);
final String clientId = context.appConfigs().get(StreamsConfig.CLIENT_ID_CONFIG).toString();
if (APP_DIR_1.equals(clientId)) {
crash = errorInjectedClient1;
sharedCommit = commitCounterClient1;
} else {
crash = errorInjectedClient2;
sharedCommit = commitCounterClient2;
}
}
@Override
public void process(final Record<Long, Long> record) {
final long key = record.key();
final long value = record.value();
if ((value + 1) % 10 == 0) {
if (sharedCommit.get() < 0 ||
sharedCommit.incrementAndGet() == 2) {
context.commit();
}
commitRequested.incrementAndGet();
}
@Override
public KeyValue<Long, Long> transform(final Long key, final Long value) {
if ((value + 1) % 10 == 0) {
if (sharedCommit.get() < 0 ||
sharedCommit.incrementAndGet() == 2) {
Long sum = state.get(key);
if (sum == null) {
sum = value;
} else {
sum += value;
}
state.put(key, sum);
state.flush();
context.commit();
}
commitRequested.incrementAndGet();
}
Long sum = state.get(key);
if (sum == null) {
sum = value;
} else {
sum += value;
}
state.put(key, sum);
state.flush();
if (value % 10 == 4 && // potentially crash when processing 5th, 15th, or 25th record (etc.)
crash != null && crash.compareAndSet(true, false)) {
// only crash a single task
throw new RuntimeException("Injected test exception.");
}
return new KeyValue<>(key, state.get(key));
if (value % 10 == 4 && // potentially crash when processing 5th, 15th, or 25th record (etc.)
crash != null && crash.compareAndSet(true, false)) {
// only crash a single task
throw new RuntimeException("Injected test exception.");
}
@Override
public void close() {}
};
} }, storeNames)
context.forward(record.withValue(state.get(key)));
}
}, storeNames)
.to(MULTI_PARTITION_OUTPUT_TOPIC);
final Properties properties = new Properties();