KAFKA-4501; Fix EasyMock and disable PowerMock tests under Java 9

- EasyMock 3.5 supports Java 9.

- Fixed issues in `testFailedSendRetryLogic` and
`testCreateConnectorAlreadyExists` exposed by new EasyMock
version. The former was passing `anyObject` to
`andReturn`, which doesn't make sense. This was leaving
behind a global `any` matcher, which caused a few issues in
the new version. Fixing this meant that the correlation ids had
to be updated to actually match. The latter was missing a
couple of expectations that the previous version of EasyMock
didn't catch.

- Removed unnecessary PowerMock dependency from 3 tests.

- Disabled remaining PowerMock tests when running with Java 9
until https://github.com/powermock/powermock/issues/783 is
in a release.

- Once we merge this PR, we can enable tests in the Java 9 builds
in Jenkins.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>

Closes #3845 from ijuma/kafka-4501-easymock-powermock-java-9
This commit is contained in:
Ismael Juma 2017-09-13 18:18:54 +01:00
parent c42bfc0d51
commit ffd8f18a12
7 changed files with 69 additions and 50 deletions

View File

@ -193,6 +193,20 @@ subprojects {
def testShowStandardStreams = false
def testExceptionFormat = 'full'
// Exclude PowerMock tests when running with Java 9 until a version of PowerMock that supports Java 9 is released
// The relevant issue is https://github.com/powermock/powermock/issues/783
String[] testsToExclude = []
if (JavaVersion.current().isJava9Compatible()) {
testsToExclude = [
"**/KafkaProducerTest.*", "**/BufferPoolTest.*",
"**/SourceTaskOffsetCommitterTest.*", "**/WorkerSinkTaskTest.*", "**/WorkerSinkTaskThreadedTest.*",
"**/WorkerSourceTaskTest.*", "**/WorkerTest.*", "**/DistributedHerderTest.*", "**/WorkerCoordinatorTest.*",
"**/RestServerTest.*", "**/ConnectorPluginsResourceTest.*", "**/ConnectorsResourceTest.*",
"**/StandaloneHerderTest.*", "**/FileOffsetBakingStoreTest.*", "**/KafkaConfigBackingStoreTest.*",
"**/KafkaOffsetBackingStoreTest.*", "**/OffsetStorageWriterTest.*", "**/KafkaBasedLogTest.*"
]
}
test {
maxParallelForks = userMaxForks ?: Runtime.runtime.availableProcessors()
@ -206,6 +220,7 @@ subprojects {
exceptionFormat = testExceptionFormat
}
exclude(testsToExclude)
}
task integrationTest(type: Test, dependsOn: compileJava) {
@ -220,9 +235,13 @@ subprojects {
showStandardStreams = userShowStandardStreams ?: testShowStandardStreams
exceptionFormat = testExceptionFormat
}
useJUnit {
includeCategories 'org.apache.kafka.test.IntegrationTest'
}
exclude(testsToExclude)
}
task unitTest(type: Test, dependsOn: compileJava) {
@ -237,9 +256,12 @@ subprojects {
showStandardStreams = userShowStandardStreams ?: testShowStandardStreams
exceptionFormat = testExceptionFormat
}
useJUnit {
excludeCategories 'org.apache.kafka.test.IntegrationTest'
}
exclude(testsToExclude)
}
jar {

View File

@ -16,14 +16,12 @@
*/
package org.apache.kafka.connect.file;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.sink.SinkConnector;
import org.easymock.EasyMockSupport;
import org.junit.Before;
import org.junit.Test;
import org.powermock.api.easymock.PowerMock;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -31,14 +29,9 @@ import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
public class FileStreamSinkConnectorTest {
public class FileStreamSinkConnectorTest extends EasyMockSupport {
private static final String MULTIPLE_TOPICS = "test1,test2";
private static final String[] MULTIPLE_TOPICS_LIST
= MULTIPLE_TOPICS.split(",");
private static final List<TopicPartition> MULTIPLE_TOPICS_PARTITIONS = Arrays.asList(
new TopicPartition("test1", 1), new TopicPartition("test2", 2)
);
private static final String FILENAME = "/afilename";
private FileStreamSinkConnector connector;
@ -48,7 +41,7 @@ public class FileStreamSinkConnectorTest {
@Before
public void setup() {
connector = new FileStreamSinkConnector();
ctx = PowerMock.createMock(ConnectorContext.class);
ctx = createMock(ConnectorContext.class);
connector.initialize(ctx);
sinkProperties = new HashMap<>();
@ -58,7 +51,7 @@ public class FileStreamSinkConnectorTest {
@Test
public void testSinkTasks() {
PowerMock.replayAll();
replayAll();
connector.start(sinkProperties);
List<Map<String, String>> taskConfigs = connector.taskConfigs(1);
@ -71,12 +64,12 @@ public class FileStreamSinkConnectorTest {
assertEquals(FILENAME, taskConfigs.get(0).get(FileStreamSinkConnector.FILE_CONFIG));
}
PowerMock.verifyAll();
verifyAll();
}
@Test
public void testSinkTasksStdout() {
PowerMock.replayAll();
replayAll();
sinkProperties.remove(FileStreamSourceConnector.FILE_CONFIG);
connector.start(sinkProperties);
@ -84,16 +77,16 @@ public class FileStreamSinkConnectorTest {
assertEquals(1, taskConfigs.size());
assertNull(taskConfigs.get(0).get(FileStreamSourceConnector.FILE_CONFIG));
PowerMock.verifyAll();
verifyAll();
}
@Test
public void testTaskClass() {
PowerMock.replayAll();
replayAll();
connector.start(sinkProperties);
assertEquals(FileStreamSinkTask.class, connector.taskClass());
PowerMock.verifyAll();
verifyAll();
}
}

View File

@ -18,9 +18,10 @@ package org.apache.kafka.connect.file;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.errors.ConnectException;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.junit.Before;
import org.junit.Test;
import org.powermock.api.easymock.PowerMock;
import java.util.HashMap;
import java.util.List;
@ -29,7 +30,7 @@ import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
public class FileStreamSourceConnectorTest {
public class FileStreamSourceConnectorTest extends EasyMockSupport {
private static final String SINGLE_TOPIC = "test";
private static final String MULTIPLE_TOPICS = "test1,test2";
@ -42,7 +43,7 @@ public class FileStreamSourceConnectorTest {
@Before
public void setup() {
connector = new FileStreamSourceConnector();
ctx = PowerMock.createMock(ConnectorContext.class);
ctx = createMock(ConnectorContext.class);
connector.initialize(ctx);
sourceProperties = new HashMap<>();
@ -52,7 +53,7 @@ public class FileStreamSourceConnectorTest {
@Test
public void testSourceTasks() {
PowerMock.replayAll();
replayAll();
connector.start(sourceProperties);
List<Map<String, String>> taskConfigs = connector.taskConfigs(1);
@ -70,12 +71,12 @@ public class FileStreamSourceConnectorTest {
assertEquals(SINGLE_TOPIC,
taskConfigs.get(0).get(FileStreamSourceConnector.TOPIC_CONFIG));
PowerMock.verifyAll();
verifyAll();
}
@Test
public void testSourceTasksStdin() {
PowerMock.replayAll();
EasyMock.replay(ctx);
sourceProperties.remove(FileStreamSourceConnector.FILE_CONFIG);
connector.start(sourceProperties);
@ -83,7 +84,7 @@ public class FileStreamSourceConnectorTest {
assertEquals(1, taskConfigs.size());
assertNull(taskConfigs.get(0).get(FileStreamSourceConnector.FILE_CONFIG));
PowerMock.verifyAll();
EasyMock.verify(ctx);
}
@Test(expected = ConnectException.class)
@ -94,11 +95,11 @@ public class FileStreamSourceConnectorTest {
@Test
public void testTaskClass() {
PowerMock.replayAll();
EasyMock.replay(ctx);
connector.start(sourceProperties);
assertEquals(FileStreamSourceTask.class, connector.taskClass());
PowerMock.verifyAll();
EasyMock.verify(ctx);
}
}

View File

@ -21,10 +21,10 @@ import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.powermock.api.easymock.PowerMock;
import java.io.ByteArrayInputStream;
import java.io.File;
@ -37,7 +37,7 @@ import java.util.Map;
import static org.junit.Assert.assertEquals;
public class FileStreamSourceTaskTest {
public class FileStreamSourceTaskTest extends EasyMockSupport {
private static final String TOPIC = "test";
@ -56,8 +56,8 @@ public class FileStreamSourceTaskTest {
config.put(FileStreamSourceConnector.FILE_CONFIG, tempFile.getAbsolutePath());
config.put(FileStreamSourceConnector.TOPIC_CONFIG, TOPIC);
task = new FileStreamSourceTask();
offsetStorageReader = PowerMock.createMock(OffsetStorageReader.class);
context = PowerMock.createMock(SourceTaskContext.class);
offsetStorageReader = createMock(OffsetStorageReader.class);
context = createMock(SourceTaskContext.class);
task.initialize(context);
}
@ -66,11 +66,11 @@ public class FileStreamSourceTaskTest {
tempFile.delete();
if (verifyMocks)
PowerMock.verifyAll();
verifyAll();
}
private void replay() {
PowerMock.replayAll();
replayAll();
verifyMocks = true;
}
@ -164,6 +164,6 @@ public class FileStreamSourceTaskTest {
private void expectOffsetLookupReturnNone() {
EasyMock.expect(context.offsetStorageReader()).andReturn(offsetStorageReader);
EasyMock.expect(offsetStorageReader.offset(EasyMock.anyObject(Map.class))).andReturn(null);
EasyMock.expect(offsetStorageReader.offset(EasyMock.<Map<String, String>>anyObject())).andReturn(null);
}
}

View File

@ -481,6 +481,8 @@ public class DistributedHerderTest {
@Test
public void testCreateConnectorAlreadyExists() throws Exception {
EasyMock.expect(member.memberId()).andStubReturn("leader");
EasyMock.expect(worker.getPlugins()).andReturn(plugins);
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(null);
expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList());
expectPostRebalanceCatchup(SNAPSHOT);

View File

@ -372,9 +372,9 @@ class AsyncProducerTest {
val props = new Properties()
props.put("metadata.broker.list", brokerList)
props.put("request.required.acks", "1")
props.put("serializer.class", classOf[StringEncoder].getName.toString)
props.put("key.serializer.class", classOf[NullEncoder[Int]].getName.toString)
props.put("producer.num.retries", 3.toString)
props.put("serializer.class", classOf[StringEncoder].getName)
props.put("key.serializer.class", classOf[NullEncoder[Int]].getName)
props.put("producer.num.retries", "3")
val config = new ProducerConfig(props)
@ -391,26 +391,27 @@ class AsyncProducerTest {
// entirely. The second request will succeed for partition 1 but fail for partition 0.
// On the third try for partition 0, let it succeed.
val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 1,
correlationId = 11, timeout = DefaultAckTimeoutMs, clientId = DefaultClientId)
correlationId = 5, timeout = DefaultAckTimeoutMs, clientId = DefaultClientId)
val request2 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 1,
correlationId = 17, timeout = DefaultAckTimeoutMs, clientId = DefaultClientId)
correlationId = 11, timeout = DefaultAckTimeoutMs, clientId = DefaultClientId)
val response1 = ProducerResponse(0,
Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(Errors.NOT_LEADER_FOR_PARTITION, 0L)),
(TopicAndPartition("topic1", 1), ProducerResponseStatus(Errors.NONE, 0L))))
val request3 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs), acks = 1, correlationId = 21,
val request3 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs), acks = 1, correlationId = 15,
timeout = DefaultAckTimeoutMs, clientId = DefaultClientId)
val response2 = ProducerResponse(0,
Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(Errors.NONE, 0L))))
val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer])
// don't care about config mock
EasyMock.expect(mockSyncProducer.config).andReturn(EasyMock.anyObject()).anyTimes()
val mockConfig = EasyMock.createNiceMock(classOf[SyncProducerConfig])
EasyMock.expect(mockSyncProducer.config).andReturn(mockConfig).anyTimes()
EasyMock.expect(mockSyncProducer.send(request1)).andThrow(new RuntimeException) // simulate SocketTimeoutException
EasyMock.expect(mockSyncProducer.send(request2)).andReturn(response1)
EasyMock.expect(mockSyncProducer.send(request3)).andReturn(response2)
EasyMock.replay(mockSyncProducer)
val producerPool = EasyMock.createMock(classOf[ProducerPool])
EasyMock.expect(producerPool.getProducer(0)).andReturn(mockSyncProducer).times(4)
EasyMock.expect(producerPool.getProducer(0)).andReturn(mockSyncProducer).times(3)
EasyMock.expect(producerPool.close())
EasyMock.replay(producerPool)
val time = new Time {
@ -419,14 +420,14 @@ class AsyncProducerTest {
override def sleep(ms: Long): Unit = {}
override def hiResClockMs: Long = 0L
}
val handler = new DefaultEventHandler[Int,String](config,
val handler = new DefaultEventHandler(config,
partitioner = new FixedValuePartitioner(),
encoder = new StringEncoder(),
keyEncoder = new NullEncoder[Int](),
producerPool = producerPool,
topicPartitionInfos = topicPartitionInfos,
time = time)
val data = msgs.map(m => new KeyedMessage[Int,String](topic1, 0, m)) ++ msgs.map(m => new KeyedMessage[Int,String](topic1, 1, m))
val data = msgs.map(m => new KeyedMessage(topic1, 0, m)) ++ msgs.map(m => new KeyedMessage(topic1, 1, m))
handler.handle(data)
handler.close()

View File

@ -51,7 +51,7 @@ versions += [
apacheds: "2.0.0-M24",
argparse4j: "0.7.0",
bcpkix: "1.58",
easymock: "3.4",
easymock: "3.5",
jackson: "2.9.1",
jetty: "9.2.22.v20170606",
jersey: "2.25.1",