KAFKA-17994 Checked exceptions are not handled (#17817)

Reviewers: Bill Bejeck <bill@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Matthias J. Sax 2024-11-15 04:36:03 -08:00 committed by Chia-Ping Tsai
parent 726d0d604d
commit 2127ae6329
5 changed files with 48 additions and 28 deletions

View File

@ -319,7 +319,10 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
record.headers())); record.headers()));
restoreCount++; restoreCount++;
} }
} catch (final RuntimeException deserializationException) { } catch (final Exception deserializationException) {
// while Java distinguishes checked vs unchecked exceptions, other languages
// like Scala or Kotlin do not, and thus we need to catch `Exception`
// (instead of `RuntimeException`) to work well with those languages
handleDeserializationFailure( handleDeserializationFailure(
deserializationExceptionHandler, deserializationExceptionHandler,
globalProcessorContext, globalProcessorContext,

View File

@ -203,7 +203,10 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
} catch (final FailedProcessingException | TaskCorruptedException | TaskMigratedException e) { } catch (final FailedProcessingException | TaskCorruptedException | TaskMigratedException e) {
// Rethrow exceptions that should not be handled here // Rethrow exceptions that should not be handled here
throw e; throw e;
} catch (final RuntimeException processingException) { } catch (final Exception processingException) {
// while Java distinguishes checked vs unchecked exceptions, other languages
// like Scala or Kotlin do not, and thus we need to catch `Exception`
// (instead of `RuntimeException`) to work well with those languages
final ErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext( final ErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext(
null, // only required to pass for DeserializationExceptionHandler null, // only required to pass for DeserializationExceptionHandler
internalProcessorContext.topic(), internalProcessorContext.topic(),
@ -220,7 +223,10 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
processingExceptionHandler.handle(errorHandlerContext, record, processingException), processingExceptionHandler.handle(errorHandlerContext, record, processingException),
"Invalid ProductionExceptionHandler response." "Invalid ProductionExceptionHandler response."
); );
} catch (final RuntimeException fatalUserException) { } catch (final Exception fatalUserException) {
// while Java distinguishes checked vs unchecked exceptions, other languages
// like Scala or Kotlin do not, and thus we need to catch `Exception`
// (instead of `RuntimeException`) to work well with those languages
log.error( log.error(
"Processing error callback failed after processing error for record: {}", "Processing error callback failed after processing error for record: {}",
errorHandlerContext, errorHandlerContext,

View File

@ -211,7 +211,10 @@ public class RecordCollectorImpl implements RecordCollector {
key, key,
keySerializer, keySerializer,
exception); exception);
} catch (final RuntimeException serializationException) { } catch (final Exception serializationException) {
// while Java distinguishes checked vs unchecked exceptions, other languages
// like Scala or Kotlin do not, and thus we need to catch `Exception`
// (instead of `RuntimeException`) to work well with those languages
handleException( handleException(
ProductionExceptionHandler.SerializationExceptionOrigin.KEY, ProductionExceptionHandler.SerializationExceptionOrigin.KEY,
topic, topic,
@ -222,7 +225,8 @@ public class RecordCollectorImpl implements RecordCollector {
timestamp, timestamp,
processorNodeId, processorNodeId,
context, context,
serializationException); serializationException
);
return; return;
} }
@ -235,7 +239,10 @@ public class RecordCollectorImpl implements RecordCollector {
value, value,
valueSerializer, valueSerializer,
exception); exception);
} catch (final RuntimeException serializationException) { } catch (final Exception serializationException) {
// while Java distinguishes checked vs unchecked exceptions, other languages
// like Scala or Kotlin do not, and thus we need to catch `Exception`
// (instead of `RuntimeException`) to work well with those languages
handleException( handleException(
ProductionExceptionHandler.SerializationExceptionOrigin.VALUE, ProductionExceptionHandler.SerializationExceptionOrigin.VALUE,
topic, topic,
@ -313,7 +320,7 @@ public class RecordCollectorImpl implements RecordCollector {
final Long timestamp, final Long timestamp,
final String processorNodeId, final String processorNodeId,
final InternalProcessorContext<Void, Void> context, final InternalProcessorContext<Void, Void> context,
final RuntimeException serializationException) { final Exception serializationException) {
log.debug(String.format("Error serializing record for topic %s", topic), serializationException); log.debug(String.format("Error serializing record for topic %s", topic), serializationException);
final ProducerRecord<K, V> record = new ProducerRecord<>(topic, partition, timestamp, key, value, headers); final ProducerRecord<K, V> record = new ProducerRecord<>(topic, partition, timestamp, key, value, headers);
@ -329,7 +336,10 @@ public class RecordCollectorImpl implements RecordCollector {
), ),
"Invalid ProductionExceptionHandler response." "Invalid ProductionExceptionHandler response."
); );
} catch (final RuntimeException fatalUserException) { } catch (final Exception fatalUserException) {
// while Java distinguishes checked vs unchecked exceptions, other languages
// like Scala or Kotlin do not, and thus we need to catch `Exception`
// (instead of `RuntimeException`) to work well with those languages
log.error( log.error(
String.format( String.format(
"Production error callback failed after serialization error for record %s: %s", "Production error callback failed after serialization error for record %s: %s",
@ -446,7 +456,10 @@ public class RecordCollectorImpl implements RecordCollector {
), ),
"Invalid ProductionExceptionHandler response." "Invalid ProductionExceptionHandler response."
); );
} catch (final RuntimeException fatalUserException) { } catch (final Exception fatalUserException) {
// while Java distinguishes checked vs unchecked exceptions, other languages
// like Scala or Kotlin do not, and thus we need to catch `Exception`
// (instead of `RuntimeException`) to work well with those languages
log.error( log.error(
"Production error callback failed after production error for record {}", "Production error callback failed after production error for record {}",
serializedRecord, serializedRecord,

View File

@ -71,7 +71,10 @@ public class RecordDeserializer {
rawRecord.headers(), rawRecord.headers(),
Optional.empty() Optional.empty()
); );
} catch (final RuntimeException deserializationException) { } catch (final Exception deserializationException) {
// while Java distinguishes checked vs unchecked exceptions, other languages
// like Scala or Kotlin do not, and thus we need to catch `Exception`
// (instead of `RuntimeException`) to work well with those languages
handleDeserializationFailure(deserializationExceptionHandler, processorContext, deserializationException, rawRecord, log, droppedRecordsSensor, sourceNode().name()); handleDeserializationFailure(deserializationExceptionHandler, processorContext, deserializationException, rawRecord, log, droppedRecordsSensor, sourceNode().name());
return null; // 'handleDeserializationFailure' would either throw or swallow -- if we swallow we need to skip the record by returning 'null' return null; // 'handleDeserializationFailure' would either throw or swallow -- if we swallow we need to skip the record by returning 'null'
} }
@ -79,7 +82,7 @@ public class RecordDeserializer {
public static void handleDeserializationFailure(final DeserializationExceptionHandler deserializationExceptionHandler, public static void handleDeserializationFailure(final DeserializationExceptionHandler deserializationExceptionHandler,
final ProcessorContext<?, ?> processorContext, final ProcessorContext<?, ?> processorContext,
final RuntimeException deserializationException, final Exception deserializationException,
final ConsumerRecord<byte[], byte[]> rawRecord, final ConsumerRecord<byte[], byte[]> rawRecord,
final Logger log, final Logger log,
final Sensor droppedRecordsSensor, final Sensor droppedRecordsSensor,
@ -101,7 +104,10 @@ public class RecordDeserializer {
deserializationExceptionHandler.handle(errorHandlerContext, rawRecord, deserializationException), deserializationExceptionHandler.handle(errorHandlerContext, rawRecord, deserializationException),
"Invalid DeserializationExceptionHandler response." "Invalid DeserializationExceptionHandler response."
); );
} catch (final RuntimeException fatalUserException) { } catch (final Exception fatalUserException) {
// while Java distinguishes checked vs unchecked exceptions, other languages
// like Scala or Kotlin do not, and thus we need to catch `Exception`
// (instead of `RuntimeException`) to work well with those languages
log.error( log.error(
"Deserialization error callback failed after deserialization error for record {}", "Deserialization error callback failed after deserialization error for record {}",
rawRecord, rawRecord,

View File

@ -52,8 +52,6 @@ import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
import org.apache.kafka.streams.state.internals.ThreadCache; import org.apache.kafka.streams.state.internals.ThreadCache;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
@ -884,18 +882,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
processTimeMs = 0L; processTimeMs = 0L;
} }
private String getStacktraceString(final Throwable e) {
String stacktrace = null;
try (final StringWriter stringWriter = new StringWriter();
final PrintWriter printWriter = new PrintWriter(stringWriter)) {
e.printStackTrace(printWriter);
stacktrace = stringWriter.toString();
} catch (final IOException ioe) {
log.error("Encountered error extracting stacktrace from this exception", ioe);
}
return stacktrace;
}
/** /**
* @throws IllegalStateException if the current node is not null * @throws IllegalStateException if the current node is not null
* @throws TaskMigratedException if the task producer got fenced (EOS only) * @throws TaskMigratedException if the task producer got fenced (EOS only)
@ -938,7 +924,10 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
throw createStreamsException(node.name(), e.getCause()); throw createStreamsException(node.name(), e.getCause());
} catch (final TaskCorruptedException | TaskMigratedException e) { } catch (final TaskCorruptedException | TaskMigratedException e) {
throw e; throw e;
} catch (final RuntimeException processingException) { } catch (final Exception processingException) {
// while Java distinguishes checked vs unchecked exceptions, other languages
// like Scala or Kotlin do not, and thus we need to catch `Exception`
// (instead of `RuntimeException`) to work well with those languages
final ErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext( final ErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext(
null, null,
recordContext.topic(), recordContext.topic(),
@ -956,7 +945,10 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
processingExceptionHandler.handle(errorHandlerContext, null, processingException), processingExceptionHandler.handle(errorHandlerContext, null, processingException),
"Invalid ProcessingExceptionHandler response." "Invalid ProcessingExceptionHandler response."
); );
} catch (final RuntimeException fatalUserException) { } catch (final Exception fatalUserException) {
// while Java distinguishes checked vs unchecked exceptions, other languages
// like Scala or Kotlin do not, and thus we need to catch `Exception`
// (instead of `RuntimeException`) to work well with those languages
log.error( log.error(
"Processing error callback failed after processing error for record: {}", "Processing error callback failed after processing error for record: {}",
errorHandlerContext, errorHandlerContext,