KAFKA-3290: fix transient test failures in WorkerSourceTaskTest

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Gwen Shapira

Closes #998 from hachikuji/KAFKA-3290
This commit is contained in:
Jason Gustafson 2016-03-02 17:22:14 -08:00 committed by Gwen Shapira
parent f676cfeb83
commit cfc324333f
2 changed files with 13 additions and 9 deletions

View File

@ -145,6 +145,9 @@ abstract class WorkerTask implements Runnable {
} catch (Throwable t) { } catch (Throwable t) {
if (!cancelled.get()) if (!cancelled.get())
lifecycleListener.onFailure(id, t); lifecycleListener.onFailure(id, t);
if (t instanceof Error)
throw t;
} }
} }

View File

@ -17,11 +17,11 @@
package org.apache.kafka.connect.runtime; package org.apache.kafka.connect.runtime;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
@ -60,9 +60,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@RunWith(PowerMockRunner.class) @RunWith(PowerMockRunner.class)
public class WorkerSourceTaskTest extends ThreadedTest { public class WorkerSourceTaskTest extends ThreadedTest {
@ -200,8 +199,6 @@ public class WorkerSourceTaskTest extends ThreadedTest {
final CountDownLatch pollLatch = expectPolls(1); final CountDownLatch pollLatch = expectPolls(1);
expectOffsetFlush(true); expectOffsetFlush(true);
sourceTask.commit();
EasyMock.expectLastCall();
sourceTask.stop(); sourceTask.stop();
EasyMock.expectLastCall(); EasyMock.expectLastCall();
expectOffsetFlush(true); expectOffsetFlush(true);
@ -235,11 +232,11 @@ public class WorkerSourceTaskTest extends ThreadedTest {
// We'll wait for some data, then trigger a flush // We'll wait for some data, then trigger a flush
final CountDownLatch pollLatch = expectPolls(1); final CountDownLatch pollLatch = expectPolls(1);
expectOffsetFlush(false); expectOffsetFlush(true);
sourceTask.stop(); sourceTask.stop();
EasyMock.expectLastCall(); EasyMock.expectLastCall();
expectOffsetFlush(true); expectOffsetFlush(false);
statusListener.onShutdown(taskId); statusListener.onShutdown(taskId);
EasyMock.expectLastCall(); EasyMock.expectLastCall();
@ -249,7 +246,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
workerTask.initialize(EMPTY_TASK_PROPS); workerTask.initialize(EMPTY_TASK_PROPS);
executor.submit(workerTask); executor.submit(workerTask);
awaitPolls(pollLatch); awaitPolls(pollLatch);
assertFalse(workerTask.commitOffsets()); assertTrue(workerTask.commitOffsets());
workerTask.stop(); workerTask.stop();
assertEquals(true, workerTask.awaitStop(1000)); assertEquals(true, workerTask.awaitStop(1000));
@ -319,9 +316,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class)); sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
EasyMock.expectLastCall(); EasyMock.expectLastCall();
sourceTask.start(EMPTY_TASK_PROPS); sourceTask.start(EMPTY_TASK_PROPS);
statusListener.onStartup(taskId);
EasyMock.expectLastCall(); EasyMock.expectLastCall();
statusListener.onStartup(taskId);
EasyMock.expectLastCall().andAnswer(new IAnswer<Object>() { EasyMock.expectLastCall().andAnswer(new IAnswer<Object>() {
@Override @Override
public Object answer() throws Throwable { public Object answer() throws Throwable {
@ -330,8 +327,10 @@ public class WorkerSourceTaskTest extends ThreadedTest {
return null; return null;
} }
}); });
sourceTask.stop(); sourceTask.stop();
EasyMock.expectLastCall(); EasyMock.expectLastCall();
expectOffsetFlush(true);
PowerMock.replayAll(); PowerMock.replayAll();
@ -450,6 +449,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
IExpectationSetters<Void> futureGetExpect = EasyMock.expect( IExpectationSetters<Void> futureGetExpect = EasyMock.expect(
flushFuture.get(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class))); flushFuture.get(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class)));
if (succeed) { if (succeed) {
sourceTask.commit();
EasyMock.expectLastCall();
futureGetExpect.andReturn(null); futureGetExpect.andReturn(null);
} else { } else {
futureGetExpect.andThrow(new TimeoutException()); futureGetExpect.andThrow(new TimeoutException());