From 847780e5a5f376fa2ce8705f483bfd33b319b83d Mon Sep 17 00:00:00 2001 From: Kevin Lafferty Date: Wed, 5 Sep 2018 20:15:25 -0700 Subject: [PATCH] KAFKA-7353: Connect logs 'this' for anonymous inner classes Replace 'this' reference in anonymous inner class logs to out class's 'this' Author: Kevin Lafferty Reviewers: Randall Hauch , Arjun Satish , Ewen Cheslack-Postava Closes #5583 from kevin-laff/connect_logging --- .../apache/kafka/connect/runtime/WorkerConnector.java | 2 +- .../apache/kafka/connect/runtime/WorkerSinkTask.java | 2 +- .../apache/kafka/connect/runtime/WorkerSourceTask.java | 10 +++++----- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java index 611e196d9de..55d4860b2e6 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java @@ -89,7 +89,7 @@ public class WorkerConnector { @Override public void raiseError(Exception e) { - log.error("{} Connector raised an error", this, e); + log.error("{} Connector raised an error", WorkerConnector.this, e); onFailure(e); ctx.raiseError(e); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index 692331ed13f..39e0c6d53f6 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -649,7 +649,7 @@ class WorkerSinkTask extends WorkerTask { long pos = consumer.position(tp); lastCommittedOffsets.put(tp, new OffsetAndMetadata(pos)); currentOffsets.put(tp, new OffsetAndMetadata(pos)); - log.debug("{} Assigned topic partition {} with offset {}", this, tp, pos); + log.debug("{} Assigned topic partition {} with offset {}", WorkerSinkTask.this, tp, pos); } sinkTaskMetricsGroup.assignedOffsets(currentOffsets); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java index 70d0cf9d7ae..623a210e0e2 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java @@ -326,11 +326,11 @@ class WorkerSourceTask extends WorkerTask { // timeouts, callbacks with exceptions should never be invoked in practice. If the // user overrode these settings, the best we can do is notify them of the failure via // logging. - log.error("{} failed to send record to {}: {}", this, topic, e); - log.debug("{} Failed record: {}", this, preTransformRecord); + log.error("{} failed to send record to {}: {}", WorkerSourceTask.this, topic, e); + log.debug("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord); } else { log.trace("{} Wrote record successfully: topic {} partition {} offset {}", - this, + WorkerSourceTask.this, recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()); commitTaskRecord(preTransformRecord); @@ -454,9 +454,9 @@ class WorkerSourceTask extends WorkerTask { @Override public void onCompletion(Throwable error, Void result) { if (error != null) { - log.error("{} Failed to flush offsets to storage: ", this, error); + log.error("{} Failed to flush offsets to storage: ", WorkerSourceTask.this, error); } else { - log.trace("{} Finished flushing offsets to storage", this); + log.trace("{} Finished flushing offsets to storage", WorkerSourceTask.this); } } });