MINOR: refactor error message of task migration (#4803)

In the stream thread capture of the TaskMigration exception, print the task full information in WARN. In other places only log as INFO, plus additional context information.

Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Guozhang Wang 2018-04-01 17:37:00 -07:00 committed by GitHub
parent 4106cb1db1
commit 2e5d4af83f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 25 additions and 17 deletions

View File

@ -30,27 +30,35 @@ public class TaskMigratedException extends StreamsException {
private final Task task;
public TaskMigratedException(final Task task) {
this(task, null);
// this is for unit test only
public TaskMigratedException() {
super("A task has been migrated unexpectedly", null);
this.task = null;
}
public TaskMigratedException(final Task task,
final TopicPartition topicPartition,
final long endOffset,
final long pos) {
super(String.format("Log end offset of %s should not change while restoring: old end offset %d, current offset %d%n%s",
super(String.format("Log end offset of %s should not change while restoring: old end offset %d, current offset %d",
topicPartition,
endOffset,
pos,
task.toString("> ")),
pos),
null);
this.task = task;
}
public TaskMigratedException(final Task task) {
super(String.format("Task %s is unexpectedly closed during processing", task.id()), null);
this.task = task;
}
public TaskMigratedException(final Task task,
final Throwable throwable) {
super(task.toString(), throwable);
super(String.format("Client request for task %s has been fenced due to a rebalance", task.id()), throwable);
this.task = task;
}

View File

@ -899,7 +899,7 @@ public class StreamThread extends Thread {
final StreamTask task = taskManager.activeTask(partition);
if (task.isClosed()) {
log.warn("Stream task {} is already closed, probably because it got unexpectedly migrated to another thread already. " +
log.info("Stream task {} is already closed, probably because it got unexpectedly migrated to another thread already. " +
"Notifying the thread to trigger a new rebalance immediately.", task.id());
throw new TaskMigratedException(task);
}
@ -1032,7 +1032,7 @@ public class StreamThread extends Thread {
final StandbyTask task = taskManager.standbyTask(partition);
if (task.isClosed()) {
log.warn("Standby task {} is already closed, probably because it got unexpectly migrated to another thread already. " +
log.info("Standby task {} is already closed, probably because it got unexpectedly migrated to another thread already. " +
"Notifying the thread to trigger a new rebalance immediately.", task.id());
throw new TaskMigratedException(task);
}
@ -1065,7 +1065,7 @@ public class StreamThread extends Thread {
}
if (task.isClosed()) {
log.warn("Standby task {} is already closed, probably because it got unexpectedly migrated to another thread already. " +
log.info("Standby task {} is already closed, probably because it got unexpectedly migrated to another thread already. " +
"Notifying the thread to trigger a new rebalance immediately.", task.id());
throw new TaskMigratedException(task);
}
@ -1084,7 +1084,7 @@ public class StreamThread extends Thread {
final StandbyTask task = taskManager.standbyTask(partition);
if (task.isClosed()) {
log.warn("Standby task {} is already closed, probably because it got unexpectly migrated to another thread already. " +
log.info("Standby task {} is already closed, probably because it got unexpectedly migrated to another thread already. " +
"Notifying the thread to trigger a new rebalance immediately.", task.id());
throw new TaskMigratedException(task);
}

View File

@ -195,7 +195,7 @@ public class AssignedStreamsTasksTest {
public void shouldCloseTaskOnSuspendIfTaskMigratedException() {
mockTaskInitialization();
t1.suspend();
EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
EasyMock.expectLastCall().andThrow(new TaskMigratedException());
t1.close(false, true);
EasyMock.expectLastCall();
EasyMock.replay(t1);
@ -226,7 +226,7 @@ public class AssignedStreamsTasksTest {
mockRunningTaskSuspension();
t1.resume();
t1.initializeTopology();
EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
EasyMock.expectLastCall().andThrow(new TaskMigratedException());
t1.close(false, true);
EasyMock.expectLastCall();
EasyMock.replay(t1);
@ -267,7 +267,7 @@ public class AssignedStreamsTasksTest {
public void shouldCloseTaskOnCommitIfTaskMigratedException() {
mockTaskInitialization();
t1.commit();
EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
EasyMock.expectLastCall().andThrow(new TaskMigratedException());
t1.close(false, true);
EasyMock.expectLastCall();
EasyMock.replay(t1);
@ -319,7 +319,7 @@ public class AssignedStreamsTasksTest {
mockTaskInitialization();
EasyMock.expect(t1.commitNeeded()).andReturn(true);
t1.commit();
EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
EasyMock.expectLastCall().andThrow(new TaskMigratedException());
t1.close(false, true);
EasyMock.expectLastCall();
EasyMock.replay(t1);
@ -338,7 +338,7 @@ public class AssignedStreamsTasksTest {
public void shouldCloseTaskOnProcessesIfTaskMigratedException() {
mockTaskInitialization();
t1.process();
EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
EasyMock.expectLastCall().andThrow(new TaskMigratedException());
t1.close(false, true);
EasyMock.expectLastCall();
EasyMock.replay(t1);
@ -370,7 +370,7 @@ public class AssignedStreamsTasksTest {
public void shouldCloseTaskOnMaybePunctuateStreamTimeIfTaskMigratedException() {
mockTaskInitialization();
t1.maybePunctuateStreamTime();
EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
EasyMock.expectLastCall().andThrow(new TaskMigratedException());
t1.close(false, true);
EasyMock.expectLastCall();
EasyMock.replay(t1);
@ -390,7 +390,7 @@ public class AssignedStreamsTasksTest {
mockTaskInitialization();
EasyMock.expect(t1.maybePunctuateStreamTime()).andReturn(true);
t1.maybePunctuateSystemTime();
EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
EasyMock.expectLastCall().andThrow(new TaskMigratedException());
t1.close(false, true);
EasyMock.expectLastCall();
EasyMock.replay(t1);