MINOR: improve RecordCollectorImpl (#17185)

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
Matthias J. Sax 2024-10-01 12:33:42 -07:00 committed by GitHub
parent 4c90d3518b
commit 4312ce6d25
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 4 additions and 6 deletions

View File

@ -81,6 +81,7 @@ public class RecordCollectorImpl implements RecordCollector {
private final Sensor droppedRecordsSensor;
private final Map<String, Sensor> producedSensorByTopic = new HashMap<>();
// we get `sendException` from "singleton" `StreamsProducer` to share it across all instances of `RecordCollectorImpl`
private final AtomicReference<KafkaException> sendException;
/**
@ -554,7 +555,7 @@ public class RecordCollectorImpl implements RecordCollector {
final KafkaException exception = sendException.get();
if (exception != null) {
sendException.set(null);
sendException.compareAndSet(exception, null);
throw exception;
}
}

View File

@ -72,6 +72,8 @@ public class StreamsProducer {
private boolean transactionInFlight = false;
private boolean transactionInitialized = false;
private double oldProducerTotalBlockedTime = 0;
// we have a single `StreamsProducer` per thread, and thus a single `sendException` instance,
// which we share across all tasks, ie, all `RecordCollectorImpl`
private final AtomicReference<KafkaException> sendException = new AtomicReference<>(null);
public StreamsProducer(final ProcessingMode processingMode,

View File

@ -118,11 +118,6 @@ public class RecordCollectorTest {
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234")
));
private final StreamsConfig eosConfig = new StreamsConfig(mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"),
mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2)
));
private final String topic = "topic";
private final String sinkNodeName = "output-node";