KAFKA-7612: Fix javac warnings and enable warnings as errors (#5900)

- Use Xlint:all with 3 exclusions (filed KAFKA-7613 to remove the exclusions)
- Use the same javac options when compiling tests (seems accidental that
we didn't do this before)
- Replaced several deprecated method calls with non-deprecated ones:
  - `KafkaConsumer.poll(long)` and `KafkaConsumer.close(long)`
  - `Class.newInstance` and `new Integer/Long` (deprecated since Java 9)
  - `scala.Console` (deprecated in Scala 2.11)
  - `PartitionData` taking a timestamp (one of them seemingly a bug)
  - `JsonMappingException` single parameter constructor
- Fix unnecessary usage of raw types in several places.
- Add @SuppressWarnings for deprecations, unchecked and switch fallthrough in
several places.
- Scala clean-ups (var -> val, ETA expansion warnings, avoid reflective calls)
- Use lambdas to simplify code in a few places
- Add @SafeVarargs, fix varargs usage and remove unnecessary `Utils.mkList` method

Reviewers: Matthias J. Sax <mjsax@apache.org>, Manikumar Reddy <manikumar.reddy@gmail.com>, Randall Hauch <rhauch@gmail.com>, Bill Bejeck <bill@confluent.io>, Stanislav Kozlovski <stanislav_kozlovski@outlook.com>
This commit is contained in:
Ismael Juma 2018-11-12 22:18:59 -08:00 committed by GitHub
parent 6c2e7005ba
commit 12f310d50e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
197 changed files with 800 additions and 814 deletions

View File

@ -155,9 +155,14 @@ subprojects {
sourceCompatibility = minJavaVersion
targetCompatibility = minJavaVersion
compileJava {
tasks.withType(JavaCompile) {
options.encoding = 'UTF-8'
options.compilerArgs << "-Xlint:deprecation,unchecked"
options.compilerArgs << "-Xlint:all"
// temporary exclusions until all the warnings are fixed
options.compilerArgs << "-Xlint:-rawtypes"
options.compilerArgs << "-Xlint:-serial"
options.compilerArgs << "-Xlint:-try"
options.compilerArgs << "-Werror"
// --release is the recommended way to select the target release, but it's only supported in Java 9 so we also
// set --source and --target via `sourceCompatibility` and `targetCompatibility`. If/when Gradle supports `--release`
// natively (https://github.com/gradle/gradle/issues/2510), we should switch to that.

View File

@ -296,7 +296,7 @@ public class FetchSessionHandler {
* @param response The response.
* @return True if the full fetch response partitions are valid.
*/
private String verifyFullFetchResponsePartitions(FetchResponse response) {
private String verifyFullFetchResponsePartitions(FetchResponse<?> response) {
StringBuilder bld = new StringBuilder();
Set<TopicPartition> omitted =
findMissing(response.responseData().keySet(), sessionPartitions.keySet());
@ -321,7 +321,7 @@ public class FetchSessionHandler {
* @param response The response.
* @return True if the incremental fetch response partitions are valid.
*/
private String verifyIncrementalFetchResponsePartitions(FetchResponse response) {
private String verifyIncrementalFetchResponsePartitions(FetchResponse<?> response) {
Set<TopicPartition> extra =
findMissing(response.responseData().keySet(), sessionPartitions.keySet());
if (!extra.isEmpty()) {
@ -340,7 +340,7 @@ public class FetchSessionHandler {
* @param response The FetchResponse.
* @return The string to log.
*/
private String responseDataToLogString(FetchResponse response) {
private String responseDataToLogString(FetchResponse<?> response) {
if (!log.isTraceEnabled()) {
int implied = sessionPartitions.size() - response.responseData().size();
if (implied > 0) {
@ -376,7 +376,7 @@ public class FetchSessionHandler {
* @return True if the response is well-formed; false if it can't be processed
* because of missing or unexpected partitions.
*/
public boolean handleResponse(FetchResponse response) {
public boolean handleResponse(FetchResponse<?> response) {
if (response.error() != Errors.NONE) {
log.info("Node {} was unable to process the fetch request with {}: {}.",
node, nextMetadata, response.error());

View File

@ -58,6 +58,7 @@ public class ConfigEntry {
* @param isReadOnly whether the config is read-only and cannot be updated
* @deprecated since 1.1.0. This constructor will be removed in a future release.
*/
@Deprecated
public ConfigEntry(String name, String value, boolean isDefault, boolean isSensitive, boolean isReadOnly) {
this(name,
value,

View File

@ -1024,7 +1024,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
public void unsubscribe() {
acquireAndEnsureOpen();
try {
fetcher.clearBufferedDataForUnassignedPartitions(Collections.EMPTY_SET);
fetcher.clearBufferedDataForUnassignedPartitions(Collections.emptySet());
this.subscriptions.unsubscribe();
this.coordinator.maybeLeaveGroup();
this.metadata.needMetadataForAllTopics(false);

View File

@ -408,6 +408,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
close(KafkaConsumer.DEFAULT_CLOSE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
}
@SuppressWarnings("deprecation")
@Override
public synchronized void close(long timeout, TimeUnit unit) {
ensureNotClosed();

View File

@ -222,6 +222,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
@Override
public void onSuccess(ClientResponse resp) {
synchronized (Fetcher.this) {
@SuppressWarnings("unchecked")
FetchResponse<Records> response = (FetchResponse<Records>) resp.responseBody();
FetchSessionHandler handler = sessionHandler(fetchTarget.id());
if (handler == null) {
@ -239,7 +240,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
for (Map.Entry<TopicPartition, FetchResponse.PartitionData<Records>> entry : response.responseData().entrySet()) {
TopicPartition partition = entry.getKey();
long fetchOffset = data.sessionPartitions().get(partition).fetchOffset;
FetchResponse.PartitionData fetchData = entry.getValue();
FetchResponse.PartitionData<Records> fetchData = entry.getValue();
log.debug("Fetch {} at offset {} for partition {} returned fetch data {}",
isolationLevel, fetchOffset, partition, fetchData);
@ -1280,18 +1281,12 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
return batch.isTransactional() && abortedProducerIds.contains(batch.producerId());
}
private PriorityQueue<FetchResponse.AbortedTransaction> abortedTransactions(FetchResponse.PartitionData partition) {
private PriorityQueue<FetchResponse.AbortedTransaction> abortedTransactions(FetchResponse.PartitionData<?> partition) {
if (partition.abortedTransactions == null || partition.abortedTransactions.isEmpty())
return null;
PriorityQueue<FetchResponse.AbortedTransaction> abortedTransactions = new PriorityQueue<>(
partition.abortedTransactions.size(),
new Comparator<FetchResponse.AbortedTransaction>() {
@Override
public int compare(FetchResponse.AbortedTransaction o1, FetchResponse.AbortedTransaction o2) {
return Long.compare(o1.firstOffset, o2.firstOffset);
}
}
partition.abortedTransactions.size(), Comparator.comparingLong(o -> o.firstOffset)
);
abortedTransactions.addAll(partition.abortedTransactions);
return abortedTransactions;

View File

@ -289,6 +289,7 @@ public class SslTransportLayer implements TransportLayer {
}
}
@SuppressWarnings("fallthrough")
private void doHandshake() throws IOException {
boolean read = key.isReadable();
boolean write = key.isWritable();

View File

@ -59,7 +59,7 @@ public class LazyDownConversionRecords implements BaseRecords {
// need to make sure that we are able to accommodate one full batch of down-converted messages. The way we achieve
// this is by having sizeInBytes method factor in the size of the first down-converted batch and return at least
// its size.
java.util.Iterator<ConvertedRecords> it = iterator(0);
java.util.Iterator<ConvertedRecords<?>> it = iterator(0);
if (it.hasNext()) {
firstConvertedBatch = it.next();
sizeInBytes = Math.max(records.sizeInBytes(), firstConvertedBatch.records().sizeInBytes());
@ -106,7 +106,7 @@ public class LazyDownConversionRecords implements BaseRecords {
return result;
}
public java.util.Iterator<ConvertedRecords> iterator(long maximumReadSize) {
public java.util.Iterator<ConvertedRecords<?>> iterator(long maximumReadSize) {
// We typically expect only one iterator instance to be created, so null out the first converted batch after
// first use to make it available for GC.
ConvertedRecords firstBatch = firstConvertedBatch;
@ -119,7 +119,7 @@ public class LazyDownConversionRecords implements BaseRecords {
* it as memory-efficient as possible by not having to maintain all down-converted records in-memory. Maintains
* a view into batches of down-converted records.
*/
private class Iterator extends AbstractIterator<ConvertedRecords> {
private class Iterator extends AbstractIterator<ConvertedRecords<?>> {
private final AbstractIterator<? extends RecordBatch> batchIterator;
private final long maximumReadSize;
private ConvertedRecords firstConvertedBatch;
@ -130,7 +130,7 @@ public class LazyDownConversionRecords implements BaseRecords {
* {@link #makeNext()}. This is a soft limit as {@link #makeNext()} will always convert
* and return at least one full message batch.
*/
private Iterator(Records recordsToDownConvert, long maximumReadSize, ConvertedRecords firstConvertedBatch) {
private Iterator(Records recordsToDownConvert, long maximumReadSize, ConvertedRecords<?> firstConvertedBatch) {
this.batchIterator = recordsToDownConvert.batchIterator();
this.maximumReadSize = maximumReadSize;
this.firstConvertedBatch = firstConvertedBatch;

View File

@ -37,7 +37,7 @@ public final class LazyDownConversionRecordsSend extends RecordsSend<LazyDownCon
private RecordConversionStats recordConversionStats;
private RecordsSend convertedRecordsWriter;
private Iterator<ConvertedRecords> convertedRecordsIterator;
private Iterator<ConvertedRecords<?>> convertedRecordsIterator;
public LazyDownConversionRecordsSend(String destination, LazyDownConversionRecords records) {
super(destination, records, records.sizeInBytes());
@ -74,8 +74,8 @@ public final class LazyDownConversionRecordsSend extends RecordsSend<LazyDownCon
// Check if we have more chunks left to down-convert
if (convertedRecordsIterator.hasNext()) {
// Get next chunk of down-converted messages
ConvertedRecords<MemoryRecords> recordsAndStats = convertedRecordsIterator.next();
convertedRecords = recordsAndStats.records();
ConvertedRecords<?> recordsAndStats = convertedRecordsIterator.next();
convertedRecords = (MemoryRecords) recordsAndStats.records();
recordConversionStats.add(recordsAndStats.recordConversionStats());
log.debug("Down-converted records for partition {} with length={}", topicPartition(), convertedRecords.sizeInBytes());
} else {

View File

@ -24,6 +24,7 @@ public class CredentialCache {
public <C> Cache<C> createCache(String mechanism, Class<C> credentialClass) {
Cache<C> cache = new Cache<>(credentialClass);
@SuppressWarnings("unchecked")
Cache<C> oldCache = (Cache<C>) cacheMap.putIfAbsent(mechanism, cache);
return oldCache == null ? cache : oldCache;
}

View File

@ -180,6 +180,7 @@ public class LoginManager {
String configName,
Class<? extends T> defaultClass) {
String prefix = jaasContext.type() == JaasContext.Type.SERVER ? ListenerName.saslMechanismPrefix(saslMechanism) : "";
@SuppressWarnings("unchecked")
Class<? extends T> clazz = (Class<? extends T>) configs.get(prefix + configName);
if (clazz != null && jaasContext.configurationEntries().size() != 1) {
String errorMessage = configName + " cannot be specified with multiple login modules in the JAAS context. " +

View File

@ -191,6 +191,7 @@ public class SaslClientAuthenticator implements Authenticator {
* The messages are sent and received as size delimited bytes that consists of a 4 byte network-ordered size N
* followed by N bytes representing the opaque payload.
*/
@SuppressWarnings("fallthrough")
public void authenticate() throws IOException {
if (netOutBuffer != null && !flushNetOutBufferAndUpdateInterestOps())
return;

View File

@ -84,6 +84,7 @@ public class SaslClientCallbackHandler implements AuthenticateCallbackHandler {
ac.setAuthorizedID(authzId);
} else if (callback instanceof ScramExtensionsCallback) {
if (ScramMechanism.isScram(mechanism) && subject != null && !subject.getPublicCredentials(Map.class).isEmpty()) {
@SuppressWarnings("unchecked")
Map<String, String> extensions = (Map<String, String>) subject.getPublicCredentials(Map.class).iterator().next();
((ScramExtensionsCallback) callback).extensions(extensions);
}

View File

@ -249,6 +249,7 @@ public class SaslServerAuthenticator implements Authenticator {
* The messages are sent and received as size delimited bytes that consists of a 4 byte network-ordered size N
* followed by N bytes representing the opaque payload.
*/
@SuppressWarnings("fallthrough")
@Override
public void authenticate() throws IOException {
if (saslState != SaslState.REAUTH_PROCESS_HANDSHAKE) {

View File

@ -89,6 +89,7 @@ public class Crc32 implements Checksum {
crc = 0xffffffff;
}
@SuppressWarnings("fallthrough")
@Override
public void update(byte[] b, int off, int len) {
if (off < 0 || len < 0 || off > b.length - len)

View File

@ -53,6 +53,7 @@ public class PureJavaCrc32C implements Checksum {
crc = 0xffffffff;
}
@SuppressWarnings("fallthrough")
@Override
public void update(byte[] b, int off, int len) {
int localCrc = crc;

View File

@ -377,6 +377,7 @@ public final class Utils {
* @param data byte array to hash
* @return 32 bit hash of the given array
*/
@SuppressWarnings("fallthrough")
public static int murmur2(final byte[] data) {
int length = data.length;
int seed = 0x9747b28c;
@ -662,18 +663,10 @@ public final class Utils {
*/
@SafeVarargs
public static <T> Set<T> mkSet(T... elems) {
return new HashSet<>(Arrays.asList(elems));
}
/*
* Creates a list
* @param elems the elements
* @param <T> the type of element
* @return List
*/
@SafeVarargs
public static <T> List<T> mkList(T... elems) {
return Arrays.asList(elems);
Set<T> result = new HashSet<>((int) (elems.length / 0.75) + 1);
for (T elem : elems)
result.add(elem);
return result;
}
/**

View File

@ -130,6 +130,7 @@ public class FetchSessionHandlerTest {
}
}
@SafeVarargs
private static void assertMapsEqual(Map<TopicPartition, FetchRequest.PartitionData> expected,
Map<TopicPartition, FetchRequest.PartitionData>... actuals) {
for (Map<TopicPartition, FetchRequest.PartitionData> actual : actuals) {

View File

@ -1119,6 +1119,7 @@ public class KafkaAdminClientTest {
}
}
@SafeVarargs
private static <T> void assertCollectionIs(Collection<T> collection, T... elements) {
for (T element : elements) {
assertTrue("Did not find " + element, collection.contains(element));

View File

@ -388,7 +388,7 @@ public class KafkaConsumerTest {
consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE));
assertTrue(heartbeatReceived.get());
consumer.close(0, TimeUnit.MILLISECONDS);
consumer.close(Duration.ofMillis(0));
}
@Test
@ -421,7 +421,7 @@ public class KafkaConsumerTest {
consumer.poll(Duration.ZERO);
assertTrue(heartbeatReceived.get());
consumer.close(0, TimeUnit.MILLISECONDS);
consumer.close(Duration.ofMillis(0));
}
@Test
@ -446,6 +446,7 @@ public class KafkaConsumerTest {
Assert.assertEquals(0, requests.size());
}
@SuppressWarnings("deprecation")
@Test
public void verifyDeprecatedPollDoesNotTimeOutDuringMetadataUpdate() {
final Time time = new MockTime();
@ -461,7 +462,6 @@ public class KafkaConsumerTest {
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
prepareRebalance(client, node, assignor, singletonList(tp0), null);
//noinspection deprecation
consumer.poll(0L);
// The underlying client SHOULD get a fetch request
@ -492,7 +492,7 @@ public class KafkaConsumerTest {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1));
assertEquals(5, records.count());
assertEquals(55L, consumer.position(tp0));
consumer.close(0, TimeUnit.MILLISECONDS);
consumer.close(Duration.ofMillis(0));
}
@Test
@ -652,7 +652,7 @@ public class KafkaConsumerTest {
offsets.put(tp1, offset2);
client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE), coordinator);
assertEquals(offset2, consumer.committed(tp1).offset());
consumer.close(0, TimeUnit.MILLISECONDS);
consumer.close(Duration.ofMillis(0));
}
@Test
@ -687,7 +687,7 @@ public class KafkaConsumerTest {
consumer.poll(Duration.ZERO);
assertTrue(commitReceived.get());
consumer.close(0, TimeUnit.MILLISECONDS);
consumer.close(Duration.ofMillis(0));
}
@Test
@ -716,7 +716,7 @@ public class KafkaConsumerTest {
assertEquals(singleton(topic), consumer.subscription());
assertEquals(singleton(tp0), consumer.assignment());
consumer.close(0, TimeUnit.MILLISECONDS);
consumer.close(Duration.ofMillis(0));
}
@Test
@ -753,7 +753,7 @@ public class KafkaConsumerTest {
consumer.poll(Duration.ZERO);
assertEquals(singleton(otherTopic), consumer.subscription());
consumer.close(0, TimeUnit.MILLISECONDS);
consumer.close(Duration.ofMillis(0));
}
@Test
@ -832,7 +832,7 @@ public class KafkaConsumerTest {
// clear interrupted state again since this thread may be reused by JUnit
Thread.interrupted();
}
consumer.close(0, TimeUnit.MILLISECONDS);
consumer.close(Duration.ofMillis(0));
}
@Test
@ -860,7 +860,7 @@ public class KafkaConsumerTest {
ConsumerRecords<String, String> records = consumer.poll(Duration.ZERO);
assertEquals(0, records.count());
consumer.close(0, TimeUnit.MILLISECONDS);
consumer.close(Duration.ofMillis(0));
}
/**
@ -1348,7 +1348,7 @@ public class KafkaConsumerTest {
consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE));
final ConsumerRecords<String, String> records = consumer.poll(Duration.ZERO);
assertFalse(records.isEmpty());
consumer.close(0, TimeUnit.MILLISECONDS);
consumer.close(Duration.ofMillis(0));
}
private void consumerCloseTest(final long closeTimeoutMs,
@ -1388,7 +1388,7 @@ public class KafkaConsumerTest {
public void run() {
consumer.commitAsync();
try {
consumer.close(closeTimeoutMs, TimeUnit.MILLISECONDS);
consumer.close(Duration.ofMillis(closeTimeoutMs));
} catch (Exception e) {
closeException.set(e);
}

View File

@ -42,7 +42,6 @@ import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.HeartbeatResponse;
@ -249,7 +248,7 @@ public class ConsumerCoordinatorTest {
final AtomicBoolean asyncCallbackInvoked = new AtomicBoolean(false);
Map<TopicPartition, OffsetCommitRequest.PartitionData> offsets = singletonMap(
new TopicPartition("foo", 0), new OffsetCommitRequest.PartitionData(13L,
RecordBatch.NO_PARTITION_LEADER_EPOCH, ""));
Optional.empty(), ""));
consumerClient.send(coordinator.checkAndGetCoordinator(), new OffsetCommitRequest.Builder(groupId, offsets))
.compose(new RequestFutureAdapter<ClientResponse, Object>() {
@Override

View File

@ -2575,9 +2575,9 @@ public class FetcherTest {
try {
Field field = FetchSessionHandler.class.getDeclaredField("sessionPartitions");
field.setAccessible(true);
LinkedHashMap<TopicPartition, FetchRequest.PartitionData> sessionPartitions =
(LinkedHashMap<TopicPartition, FetchRequest.PartitionData>) field.get(handler);
for (Map.Entry<TopicPartition, FetchRequest.PartitionData> entry : sessionPartitions.entrySet()) {
LinkedHashMap<?, ?> sessionPartitions =
(LinkedHashMap<?, ?>) field.get(handler);
for (Map.Entry<?, ?> entry : sessionPartitions.entrySet()) {
// If `sessionPartitions` are modified on another thread, Thread.yield will increase the
// possibility of ConcurrentModificationException if appropriate synchronization is not used.
Thread.yield();

View File

@ -37,7 +37,6 @@ import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.ExtendedSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.LogContext;
@ -135,7 +134,7 @@ public class KafkaProducerTest {
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.put(1, "not string key");
try (KafkaProducer<?, ?> ff = new KafkaProducer(props, new StringSerializer(), new StringSerializer())) {
try (KafkaProducer<?, ?> ff = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer())) {
fail("Constructor should throw exception");
} catch (ConfigException e) {
assertTrue("Unexpected exception message: " + e.getMessage(), e.getMessage().contains("not string key"));
@ -460,12 +459,14 @@ public class KafkaProducerTest {
assertTrue("Topic should still exist in metadata", metadata.containsTopic(topic));
}
@SuppressWarnings("unchecked")
@Test
@Deprecated
public void testHeadersWithExtendedClasses() {
doTestHeaders(ExtendedSerializer.class);
doTestHeaders(org.apache.kafka.common.serialization.ExtendedSerializer.class);
}
@SuppressWarnings("unchecked")
@Test
public void testHeaders() {
doTestHeaders(Serializer.class);

View File

@ -67,7 +67,7 @@ public class KafkaFutureTest {
@Test
public void testCompletingFutures() throws Exception {
final KafkaFutureImpl<String> future = new KafkaFutureImpl<>();
CompleterThread myThread = new CompleterThread(future, "You must construct additional pylons.");
CompleterThread<String> myThread = new CompleterThread<>(future, "You must construct additional pylons.");
assertFalse(future.isDone());
assertFalse(future.isCompletedExceptionally());
assertFalse(future.isCancelled());
@ -86,39 +86,19 @@ public class KafkaFutureTest {
@Test
public void testThenApply() throws Exception {
KafkaFutureImpl<Integer> future = new KafkaFutureImpl<>();
KafkaFuture<Integer> doubledFuture = future.thenApply(new KafkaFuture.BaseFunction<Integer, Integer>() {
@Override
public Integer apply(Integer integer) {
return 2 * integer;
}
});
KafkaFuture<Integer> doubledFuture = future.thenApply(integer -> 2 * integer);
assertFalse(doubledFuture.isDone());
KafkaFuture<Integer> tripledFuture = future.thenApply(new KafkaFuture.Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) {
return 3 * integer;
}
});
KafkaFuture<Integer> tripledFuture = future.thenApply(integer -> 3 * integer);
assertFalse(tripledFuture.isDone());
future.complete(21);
assertEquals(Integer.valueOf(21), future.getNow(-1));
assertEquals(Integer.valueOf(42), doubledFuture.getNow(-1));
assertEquals(Integer.valueOf(63), tripledFuture.getNow(-1));
KafkaFuture<Integer> quadrupledFuture = future.thenApply(new KafkaFuture.BaseFunction<Integer, Integer>() {
@Override
public Integer apply(Integer integer) {
return 4 * integer;
}
});
KafkaFuture<Integer> quadrupledFuture = future.thenApply(integer -> 4 * integer);
assertEquals(Integer.valueOf(84), quadrupledFuture.getNow(-1));
KafkaFutureImpl<Integer> futureFail = new KafkaFutureImpl<>();
KafkaFuture<Integer> futureAppliedFail = futureFail.thenApply(new KafkaFuture.BaseFunction<Integer, Integer>() {
@Override
public Integer apply(Integer integer) {
return 2 * integer;
}
});
KafkaFuture<Integer> futureAppliedFail = futureFail.thenApply(integer -> 2 * integer);
futureFail.completeExceptionally(new RuntimeException());
assertTrue(futureFail.isCompletedExceptionally());
assertTrue(futureAppliedFail.isCompletedExceptionally());
@ -176,7 +156,7 @@ public class KafkaFutureTest {
final int numThreads = 5;
final List<KafkaFutureImpl<Integer>> futures = new ArrayList<>();
for (int i = 0; i < numThreads; i++) {
futures.add(new KafkaFutureImpl<Integer>());
futures.add(new KafkaFutureImpl<>());
}
KafkaFuture<Void> allFuture = KafkaFuture.allOf(futures.toArray(new KafkaFuture[0]));
final List<CompleterThread> completerThreads = new ArrayList<>();

View File

@ -378,7 +378,7 @@ public class ConfigDefTest {
updateModes.put("my.broker.config", "per-broker");
updateModes.put("my.cluster.config", "cluster-wide");
final String html = configDef.toHtmlTable(updateModes);
Set<String> configsInHtml = new HashSet();
Set<String> configsInHtml = new HashSet<>();
for (String line : html.split("\n")) {
if (line.contains("my.broker.config")) {
assertTrue(line.contains("per-broker"));

View File

@ -839,6 +839,7 @@ public class MetricsTest {
* This test is to verify the deprecated {@link Metric#value()} method.
* @deprecated This will be removed in a future major release.
*/
@Deprecated
@Test
public void testDeprecatedMetricValueMethod() {
verifyStats(KafkaMetric::value);

View File

@ -23,7 +23,6 @@ import org.apache.kafka.common.security.auth.AuthenticationContext;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
import org.apache.kafka.common.security.auth.PlaintextAuthenticationContext;
import org.apache.kafka.common.security.auth.PrincipalBuilder;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.Test;
@ -66,7 +65,7 @@ public class ChannelBuildersTest {
}
@SuppressWarnings("deprecation")
public static class OldPrincipalBuilder implements PrincipalBuilder {
public static class OldPrincipalBuilder implements org.apache.kafka.common.security.auth.PrincipalBuilder {
private static boolean configured = false;
private static final String PRINCIPAL_NAME = "bob";

View File

@ -381,7 +381,7 @@ public class FileRecordsTest {
// Lazy down-conversion will not return any messages for a partial input batch
TopicPartition tp = new TopicPartition("topic-1", 0);
LazyDownConversionRecords lazyRecords = new LazyDownConversionRecords(tp, slice, RecordBatch.MAGIC_VALUE_V0, 0, Time.SYSTEM);
Iterator<ConvertedRecords> it = lazyRecords.iterator(16 * 1024L);
Iterator<ConvertedRecords<?>> it = lazyRecords.iterator(16 * 1024L);
assertTrue("No messages should be returned", !it.hasNext());
}
@ -538,7 +538,7 @@ public class FileRecordsTest {
for (long readSize : maximumReadSize) {
TopicPartition tp = new TopicPartition("topic-1", 0);
LazyDownConversionRecords lazyRecords = new LazyDownConversionRecords(tp, fileRecords, toMagic, firstOffset, Time.SYSTEM);
Iterator<ConvertedRecords> it = lazyRecords.iterator(readSize);
Iterator<ConvertedRecords<?>> it = lazyRecords.iterator(readSize);
while (it.hasNext())
convertedRecords.add(it.next().records());
verifyConvertedRecords(initialRecords, initialOffsets, convertedRecords, compressionType, toMagic);

View File

@ -840,6 +840,7 @@ public class RequestResponseTest {
return new MetadataResponse(asList(node), null, MetadataResponse.NO_CONTROLLER_ID, allTopicMetadata);
}
@SuppressWarnings("deprecation")
private OffsetCommitRequest createOffsetCommitRequest(int version) {
Map<TopicPartition, OffsetCommitRequest.PartitionData> commitData = new HashMap<>();
commitData.put(new TopicPartition("test", 0), new OffsetCommitRequest.PartitionData(100,

View File

@ -35,7 +35,7 @@ import static org.junit.Assert.assertEquals;
public class SerializationTest {
final private String topic = "testTopic";
final private Map<Class<Object>, List<Object>> testData = new HashMap() {
final private Map<Class<?>, List<Object>> testData = new HashMap<Class<?>, List<Object>>() {
{
put(String.class, Arrays.asList("my string"));
put(Short.class, Arrays.asList((short) 32767, (short) -32768));
@ -53,10 +53,11 @@ public class SerializationTest {
private class DummyClass {
}
@SuppressWarnings("unchecked")
@Test
public void allSerdesShouldRoundtripInput() {
for (Map.Entry<Class<Object>, List<Object>> test : testData.entrySet()) {
try (Serde<Object> serde = Serdes.serdeFrom(test.getKey())) {
for (Map.Entry<Class<?>, List<Object>> test : testData.entrySet()) {
try (Serde<Object> serde = Serdes.serdeFrom((Class<Object>) test.getKey())) {
for (Object value : test.getValue()) {
assertEquals("Should get the original " + test.getKey().getSimpleName() +
" after serialization and deserialization", value,

View File

@ -55,8 +55,8 @@ public class ConnectorUtilsTest {
Arrays.asList(3),
Arrays.asList(4),
Arrays.asList(5),
Collections.EMPTY_LIST,
Collections.EMPTY_LIST), grouped);
Collections.emptyList(),
Collections.emptyList()), grouped);
}
@Test(expected = IllegalArgumentException.class)

View File

@ -69,7 +69,7 @@ public class JsonConverterTest {
@Before
public void setUp() {
converter.configure(Collections.EMPTY_MAP, false);
converter.configure(Collections.emptyMap(), false);
}
// Schema metadata

View File

@ -86,6 +86,7 @@ abstract class NumberConverter<T extends Number> implements Converter, HeaderCon
configure(conf);
}
@SuppressWarnings("unchecked")
protected T cast(Object value) {
return (T) value;
}

View File

@ -162,6 +162,7 @@ public class ConnectorConfig extends AbstractConfig {
.define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, null, Importance.LOW, VALUE_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, VALUE_CONVERTER_CLASS_DISPLAY)
.define(HEADER_CONVERTER_CLASS_CONFIG, Type.CLASS, HEADER_CONVERTER_CLASS_DEFAULT, Importance.LOW, HEADER_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, HEADER_CONVERTER_CLASS_DISPLAY)
.define(TRANSFORMS_CONFIG, Type.LIST, Collections.emptyList(), ConfigDef.CompositeValidator.of(new ConfigDef.NonNullValidator(), new ConfigDef.Validator() {
@SuppressWarnings("unchecked")
@Override
public void ensureValid(String name, Object value) {
final List<String> transformAliases = (List<String>) value;
@ -247,14 +248,15 @@ public class ConnectorConfig extends AbstractConfig {
final List<Transformation<R>> transformations = new ArrayList<>(transformAliases.size());
for (String alias : transformAliases) {
final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
final Transformation<R> transformation;
try {
transformation = getClass(prefix + "type").asSubclass(Transformation.class).newInstance();
@SuppressWarnings("unchecked")
final Transformation<R> transformation = getClass(prefix + "type").asSubclass(Transformation.class)
.getDeclaredConstructor().newInstance();
transformation.configure(originalsWithPrefix(prefix));
transformations.add(transformation);
} catch (Exception e) {
throw new ConnectException(e);
}
transformation.configure(originalsWithPrefix(prefix));
transformations.add(transformation);
}
return transformations;

View File

@ -96,6 +96,7 @@ public class Worker {
private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
private WorkerConfigTransformer workerConfigTransformer;
@SuppressWarnings("deprecation")
public Worker(
String workerId,
Time time,

View File

@ -141,6 +141,7 @@ public class WorkerConnector {
return state == State.STARTED;
}
@SuppressWarnings("fallthrough")
private void pause() {
try {
switch (state) {

View File

@ -190,6 +190,7 @@ public class RetryWithToleranceOperator {
}
// Visible for testing
@SuppressWarnings("fallthrough")
boolean withinToleranceLimits() {
switch (errorToleranceType) {
case NONE:

View File

@ -151,13 +151,8 @@ public class DelegatingClassLoader extends URLClassLoader {
final URL[] urls,
final ClassLoader parent
) {
return (PluginClassLoader) AccessController.doPrivileged(
new PrivilegedAction() {
@Override
public Object run() {
return new PluginClassLoader(pluginLocation, urls, parent);
}
}
return AccessController.doPrivileged(
(PrivilegedAction<PluginClassLoader>) () -> new PluginClassLoader(pluginLocation, urls, parent)
);
}
@ -331,6 +326,7 @@ public class DelegatingClassLoader extends URLClassLoader {
return result;
}
@SuppressWarnings("unchecked")
private <T> Collection<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> klass, ClassLoader loader) {
ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, loader);
Collection<PluginDesc<T>> result = new ArrayList<>();

View File

@ -62,13 +62,8 @@ public class Plugins {
}
private static DelegatingClassLoader newDelegatingClassLoader(final List<String> paths) {
return (DelegatingClassLoader) AccessController.doPrivileged(
new PrivilegedAction() {
@Override
public Object run() {
return new DelegatingClassLoader(paths);
}
}
return AccessController.doPrivileged(
(PrivilegedAction<DelegatingClassLoader>) () -> new DelegatingClassLoader(paths)
);
}
@ -102,6 +97,7 @@ public class Plugins {
);
}
@SuppressWarnings("deprecation")
protected static boolean isInternalConverter(String classPropertyName) {
return classPropertyName.equals(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG)
|| classPropertyName.equals(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG);
@ -243,6 +239,7 @@ public class Plugins {
}
// Determine whether this is a key or value converter based upon the supplied property name ...
@SuppressWarnings("deprecation")
final boolean isKeyConverter = WorkerConfig.KEY_CONVERTER_CLASS_CONFIG.equals(classPropertyName)
|| WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG.equals(classPropertyName);

View File

@ -73,7 +73,7 @@ public class ConnectRestConfigurable implements Configurable<ResourceConfig> {
}
@Override
public ResourceConfig register(Object component, Map contracts) {
public ResourceConfig register(Object component, Map<Class<?>, Integer> contracts) {
if (allowedToRegister(component)) {
resourceConfig.register(component, contracts);
}
@ -81,7 +81,7 @@ public class ConnectRestConfigurable implements Configurable<ResourceConfig> {
}
@Override
public ResourceConfig register(Object component, Class[] contracts) {
public ResourceConfig register(Object component, Class... contracts) {
if (allowedToRegister(component)) {
resourceConfig.register(component, contracts);
}
@ -89,7 +89,7 @@ public class ConnectRestConfigurable implements Configurable<ResourceConfig> {
}
@Override
public ResourceConfig register(Class componentClass, Map contracts) {
public ResourceConfig register(Class<?> componentClass, Map<Class<?>, Integer> contracts) {
if (allowedToRegister(componentClass)) {
resourceConfig.register(componentClass, contracts);
}
@ -97,7 +97,7 @@ public class ConnectRestConfigurable implements Configurable<ResourceConfig> {
}
@Override
public ResourceConfig register(Class componentClass, Class[] contracts) {
public ResourceConfig register(Class<?> componentClass, Class<?>... contracts) {
if (allowedToRegister(componentClass)) {
resourceConfig.register(componentClass, contracts);
}
@ -105,7 +105,7 @@ public class ConnectRestConfigurable implements Configurable<ResourceConfig> {
}
@Override
public ResourceConfig register(Class componentClass, int priority) {
public ResourceConfig register(Class<?> componentClass, int priority) {
if (allowedToRegister(componentClass)) {
resourceConfig.register(componentClass, priority);
}
@ -113,7 +113,7 @@ public class ConnectRestConfigurable implements Configurable<ResourceConfig> {
}
@Override
public ResourceConfig register(Class componentClass) {
public ResourceConfig register(Class<?> componentClass) {
if (allowedToRegister(componentClass)) {
resourceConfig.register(componentClass);
}
@ -128,7 +128,7 @@ public class ConnectRestConfigurable implements Configurable<ResourceConfig> {
return ALLOWED_TO_REGISTER;
}
private boolean allowedToRegister(Class componentClass) {
private boolean allowedToRegister(Class<?> componentClass) {
if (resourceConfig.isRegistered(componentClass)) {
log.warn("The resource {} is already registered", componentClass);
return NOT_ALLOWED_TO_REGISTER;

View File

@ -77,7 +77,7 @@ public class RestServer {
private final WorkerConfig config;
private Server jettyServer;
private List<ConnectRestExtension> connectRestExtensions = Collections.EMPTY_LIST;
private List<ConnectRestExtension> connectRestExtensions = Collections.emptyList();
/**
* Create a REST server for this herder using the specified configs.
@ -92,6 +92,7 @@ public class RestServer {
createConnectors(listeners);
}
@SuppressWarnings("deprecation")
List<String> parseListeners() {
List<String> listeners = config.getList(WorkerConfig.LISTENERS_CONFIG);
if (listeners == null || listeners.size() == 0) {

View File

@ -104,6 +104,7 @@ public class SSLUtils {
/**
* Configures Protocol, Algorithm and Provider related settings in SslContextFactory
*/
@SuppressWarnings("unchecked")
protected static void configureSslContextFactoryAlgorithms(SslContextFactory ssl, Map<String, Object> sslConfigValues) {
List<String> sslEnabledProtocols = (List<String>) getOrDefault(sslConfigValues, SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList(COMMA_WITH_WHITESPACE.split(SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS)));
ssl.setIncludeProtocols(sslEnabledProtocols.toArray(new String[sslEnabledProtocols.size()]));

View File

@ -89,6 +89,7 @@ public class OffsetStorageWriter {
* @param partition the partition to store an offset for
* @param offset the offset
*/
@SuppressWarnings("unchecked")
public synchronized void offset(Map<String, ?> partition, Map<String, ?> offset) {
data.put((Map<String, Object>) partition, (Map<String, Object>) offset);
}

View File

@ -241,7 +241,7 @@ public class AbstractHerderTest {
AbstractHerder herder = createConfigValidationHerder(TestSinkConnector.class);
replayAll();
Map<String, String> config = new HashMap();
Map<String, String> config = new HashMap<>();
config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestSinkConnector.class.getName());
config.put(SinkConnectorConfig.TOPICS_CONFIG, "topic1,topic2");
config.put(SinkConnectorConfig.TOPICS_REGEX_CONFIG, "topic.*");

View File

@ -37,6 +37,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
@SuppressWarnings("deprecation")
public class ConnectMetricsTest {
private static final Map<String, String> DEFAULT_WORKER_CONFIG = new HashMap<>();

View File

@ -40,6 +40,7 @@ import java.util.Map;
* If the same metric is created a second time (e.g., a worker task is re-created), the new metric will replace
* the previous metric in the custom reporter.
*/
@SuppressWarnings("deprecation")
public class MockConnectMetrics extends ConnectMetrics {
private static final Map<String, String> DEFAULT_WORKER_CONFIG = new HashMap<>();

View File

@ -48,12 +48,12 @@ import static org.junit.Assert.fail;
@RunWith(PowerMockRunner.class)
public class SourceTaskOffsetCommitterTest extends ThreadedTest {
private final ConcurrentHashMap committers = new ConcurrentHashMap<>();
private final ConcurrentHashMap<ConnectorTaskId, ScheduledFuture<?>> committers = new ConcurrentHashMap<>();
@Mock private ScheduledExecutorService executor;
@Mock private Logger mockLog;
@Mock private ScheduledFuture commitFuture;
@Mock private ScheduledFuture taskFuture;
@Mock private ScheduledFuture<?> commitFuture;
@Mock private ScheduledFuture<?> taskFuture;
@Mock private ConnectorTaskId taskId;
@Mock private WorkerSourceTask task;
@ -79,6 +79,7 @@ public class SourceTaskOffsetCommitterTest extends ThreadedTest {
Whitebox.setInternalState(SourceTaskOffsetCommitter.class, "log", mockLog);
}
@SuppressWarnings("unchecked")
@Test
public void testSchedule() {
Capture<Runnable> taskWrapper = EasyMock.newCapture();
@ -86,7 +87,7 @@ public class SourceTaskOffsetCommitterTest extends ThreadedTest {
EasyMock.expect(executor.scheduleWithFixedDelay(
EasyMock.capture(taskWrapper), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS),
eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS), eq(TimeUnit.MILLISECONDS))
).andReturn(commitFuture);
).andReturn((ScheduledFuture) commitFuture);
PowerMock.replayAll();

View File

@ -113,7 +113,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
@Mock private Converter keyConverter;
@Mock private Converter valueConverter;
@Mock private HeaderConverter headerConverter;
@Mock private TransformationChain transformationChain;
@Mock private TransformationChain<SinkRecord> transformationChain;
private WorkerSinkTask workerTask;
@Mock private KafkaConsumer<byte[], byte[]> consumer;
private Capture<ConsumerRebalanceListener> rebalanceListener = EasyMock.newCapture();
@ -141,7 +141,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
WorkerSinkTask.class, new String[]{"createConsumer"},
taskId, sinkTask, statusListener, initialState, workerConfig, ClusterConfigState.EMPTY, metrics, keyConverter,
valueConverter, headerConverter,
new TransformationChain(Collections.emptyList(), RetryWithToleranceOperatorTest.NOOP_OPERATOR),
new TransformationChain<>(Collections.emptyList(), RetryWithToleranceOperatorTest.NOOP_OPERATOR),
pluginLoader, time, RetryWithToleranceOperatorTest.NOOP_OPERATOR);
recordsReturned = 0;
@ -578,12 +578,8 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
EasyMock.expect(valueConverter.toConnectData(TOPIC, RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)).anyTimes();
final Capture<SinkRecord> recordCapture = EasyMock.newCapture();
EasyMock.expect(transformationChain.apply(EasyMock.capture(recordCapture))).andAnswer(new IAnswer<SinkRecord>() {
@Override
public SinkRecord answer() {
return recordCapture.getValue();
}
}).anyTimes();
EasyMock.expect(transformationChain.apply(EasyMock.capture(recordCapture))).andAnswer(
(IAnswer<SinkRecord>) () -> recordCapture.getValue()).anyTimes();
Capture<Collection<SinkRecord>> capturedRecords = EasyMock.newCapture(CaptureType.ALL);
sinkTask.put(EasyMock.capture(capturedRecords));

View File

@ -488,7 +488,7 @@ public class WorkerTest extends ThreadedTest {
anyObject(JsonConverter.class),
anyObject(JsonConverter.class),
anyObject(JsonConverter.class),
EasyMock.eq(new TransformationChain(Collections.emptyList(), NOOP_OPERATOR)),
EasyMock.eq(new TransformationChain<>(Collections.emptyList(), NOOP_OPERATOR)),
anyObject(KafkaProducer.class),
anyObject(OffsetStorageReader.class),
anyObject(OffsetStorageWriter.class),
@ -627,7 +627,7 @@ public class WorkerTest extends ThreadedTest {
anyObject(JsonConverter.class),
anyObject(JsonConverter.class),
anyObject(JsonConverter.class),
EasyMock.eq(new TransformationChain(Collections.emptyList(), NOOP_OPERATOR)),
EasyMock.eq(new TransformationChain<>(Collections.emptyList(), NOOP_OPERATOR)),
anyObject(KafkaProducer.class),
anyObject(OffsetStorageReader.class),
anyObject(OffsetStorageWriter.class),
@ -720,7 +720,7 @@ public class WorkerTest extends ThreadedTest {
EasyMock.capture(keyConverter),
EasyMock.capture(valueConverter),
EasyMock.capture(headerConverter),
EasyMock.eq(new TransformationChain(Collections.emptyList(), NOOP_OPERATOR)),
EasyMock.eq(new TransformationChain<>(Collections.emptyList(), NOOP_OPERATOR)),
anyObject(KafkaProducer.class),
anyObject(OffsetStorageReader.class),
anyObject(OffsetStorageWriter.class),
@ -859,6 +859,7 @@ public class WorkerTest extends ThreadedTest {
expectConverters(JsonConverter.class, expectDefaultConverters);
}
@SuppressWarnings("deprecation")
private void expectConverters(Class<? extends Converter> converterClass, Boolean expectDefaultConverters) {
// As default converters are instantiated when a task starts, they are expected only if the `startTask` method is called
if (expectDefaultConverters) {

View File

@ -80,6 +80,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@SuppressWarnings("deprecation")
@RunWith(PowerMockRunner.class)
@PrepareForTest({DistributedHerder.class, Plugins.class})
@PowerMockIgnore("javax.management.*")
@ -492,6 +493,7 @@ public class DistributedHerderTest {
PowerMock.verifyAll();
}
@SuppressWarnings("unchecked")
@Test
public void testConnectorNameConflictsWithWorkerGroupId() throws Exception {
EasyMock.expect(member.memberId()).andStubReturn("leader");

View File

@ -67,6 +67,7 @@ public class PluginsTest {
plugins = new Plugins(pluginProps);
}
@SuppressWarnings("deprecation")
@Before
public void setup() {
props = new HashMap<>(pluginProps);
@ -103,6 +104,7 @@ public class PluginsTest {
assertEquals("foo2", converter.configs.get("extra.config"));
}
@SuppressWarnings("deprecation")
@Test
public void shouldInstantiateAndConfigureInternalConverters() {
instantiateAndConfigureInternalConverter(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.CURRENT_CLASSLOADER);

View File

@ -64,6 +64,7 @@ public class RestServerTest {
server.stop();
}
@SuppressWarnings("deprecation")
private Map<String, String> baseWorkerProps() {
Map<String, String> workerProps = new HashMap<>();
workerProps.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "status-topic");
@ -89,6 +90,7 @@ public class RestServerTest {
checkCORSRequest("", "http://bar.com", null, null);
}
@SuppressWarnings("deprecation")
@Test
public void testParseListeners() {
// Use listeners field
@ -108,6 +110,7 @@ public class RestServerTest {
Assert.assertArrayEquals(new String[] {"http://my-hostname:8080"}, server.parseListeners().toArray());
}
@SuppressWarnings("deprecation")
@Test
public void testAdvertisedUri() {
// Advertised URI from listeenrs without protocol
@ -165,10 +168,10 @@ public class RestServerTest {
System.setProperty("sun.net.http.allowRestrictedHeaders", "true");
EasyMock.expect(herder.plugins()).andStubReturn(plugins);
EasyMock.expect(plugins.newPlugins(Collections.EMPTY_LIST,
EasyMock.expect(plugins.newPlugins(Collections.emptyList(),
workerConfig,
ConnectRestExtension.class))
.andStubReturn(Collections.EMPTY_LIST);
.andStubReturn(Collections.emptyList());
final Capture<Callback<Collection<String>>> connectorsCallback = EasyMock.newCapture();
herder.connectors(EasyMock.capture(connectorsCallback));

View File

@ -159,11 +159,14 @@ public class ConnectorPluginsResourceTest {
try {
for (Class<?> klass : abstractConnectorClasses) {
CONNECTOR_PLUGINS.add(
new MockConnectorPluginDesc((Class<? extends Connector>) klass, "0.0.0"));
@SuppressWarnings("unchecked")
MockConnectorPluginDesc pluginDesc = new MockConnectorPluginDesc((Class<? extends Connector>) klass, "0.0.0");
CONNECTOR_PLUGINS.add(pluginDesc);
}
for (Class<?> klass : connectorClasses) {
CONNECTOR_PLUGINS.add(new MockConnectorPluginDesc((Class<? extends Connector>) klass));
@SuppressWarnings("unchecked")
MockConnectorPluginDesc pluginDesc = new MockConnectorPluginDesc((Class<? extends Connector>) klass);
CONNECTOR_PLUGINS.add(pluginDesc);
}
} catch (Exception e) {
throw new RuntimeException(e);

View File

@ -29,6 +29,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
@SuppressWarnings("deprecation")
public class SSLUtilsTest {
private static final Map<String, String> DEFAULT_CONFIG = new HashMap<>();
static {

View File

@ -47,6 +47,7 @@ public class FileOffsetBackingStoreTest {
firstSet.put(null, null);
}
@SuppressWarnings("deprecation")
@Before
public void setup() throws IOException {
store = new FileOffsetBackingStore();

View File

@ -63,7 +63,7 @@ import static org.junit.Assert.assertTrue;
@RunWith(PowerMockRunner.class)
@PrepareForTest(KafkaConfigBackingStore.class)
@PowerMockIgnore("javax.management.*")
@SuppressWarnings("unchecked")
@SuppressWarnings({"unchecked", "deprecation"})
public class KafkaConfigBackingStoreTest {
private static final String TOPIC = "connect-configs";
private static final short TOPIC_REPLICATION_FACTOR = 5;
@ -154,7 +154,7 @@ public class KafkaConfigBackingStoreTest {
@Test
public void testStartStop() throws Exception {
expectConfigure();
expectStart(Collections.EMPTY_LIST, Collections.EMPTY_MAP);
expectStart(Collections.emptyList(), Collections.emptyMap());
expectStop();
PowerMock.replayAll();
@ -179,7 +179,7 @@ public class KafkaConfigBackingStoreTest {
@Test
public void testPutConnectorConfig() throws Exception {
expectConfigure();
expectStart(Collections.EMPTY_LIST, Collections.EMPTY_MAP);
expectStart(Collections.emptyList(), Collections.emptyMap());
expectConvertWriteAndRead(
CONNECTOR_CONFIG_KEYS.get(0), KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
@ -241,10 +241,10 @@ public class KafkaConfigBackingStoreTest {
@Test
public void testPutTaskConfigs() throws Exception {
expectConfigure();
expectStart(Collections.EMPTY_LIST, Collections.EMPTY_MAP);
expectStart(Collections.emptyList(), Collections.emptyMap());
// Task configs should read to end, write to the log, read to end, write root, then read to end again
expectReadToEnd(new LinkedHashMap<String, byte[]>());
expectReadToEnd(new LinkedHashMap<>());
expectConvertWriteRead(
TASK_CONFIG_KEYS.get(0), KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
"properties", SAMPLE_CONFIGS.get(0));
@ -274,7 +274,7 @@ public class KafkaConfigBackingStoreTest {
configStorage.start();
// Bootstrap as if we had already added the connector, but no tasks had been added yet
whiteboxAddConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), Collections.EMPTY_LIST);
whiteboxAddConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), Collections.emptyList());
// Null before writing
ClusterConfigState configState = configStorage.snapshot();
@ -305,10 +305,10 @@ public class KafkaConfigBackingStoreTest {
@Test
public void testPutTaskConfigsZeroTasks() throws Exception {
expectConfigure();
expectStart(Collections.EMPTY_LIST, Collections.EMPTY_MAP);
expectStart(Collections.emptyList(), Collections.emptyMap());
// Task configs should read to end, write to the log, read to end, write root.
expectReadToEnd(new LinkedHashMap<String, byte[]>());
expectReadToEnd(new LinkedHashMap<>());
expectConvertWriteRead(
COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(0),
"tasks", 0); // We have 0 tasks
@ -329,7 +329,7 @@ public class KafkaConfigBackingStoreTest {
configStorage.start();
// Bootstrap as if we had already added the connector, but no tasks had been added yet
whiteboxAddConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), Collections.EMPTY_LIST);
whiteboxAddConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), Collections.emptyList());
// Null before writing
ClusterConfigState configState = configStorage.snapshot();
@ -727,7 +727,7 @@ public class KafkaConfigBackingStoreTest {
assertEquals(6, configState.offset()); // Should always be next to be read, not last committed
assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors()));
// Inconsistent data should leave us with no tasks listed for the connector and an entry in the inconsistent list
assertEquals(Collections.EMPTY_LIST, configState.tasks(CONNECTOR_IDS.get(0)));
assertEquals(Collections.emptyList(), configState.tasks(CONNECTOR_IDS.get(0)));
// Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0]
assertNull(configState.taskConfig(TASK_IDS.get(0)));
assertNull(configState.taskConfig(TASK_IDS.get(1)));

View File

@ -60,7 +60,7 @@ import static org.junit.Assert.fail;
@RunWith(PowerMockRunner.class)
@PrepareForTest(KafkaOffsetBackingStore.class)
@PowerMockIgnore("javax.management.*")
@SuppressWarnings("unchecked")
@SuppressWarnings({"unchecked", "deprecation"})
public class KafkaOffsetBackingStoreTest {
private static final String TOPIC = "connect-offsets";
private static final short TOPIC_PARTITIONS = 2;
@ -117,7 +117,7 @@ public class KafkaOffsetBackingStoreTest {
@Test
public void testStartStop() throws Exception {
expectConfigure();
expectStart(Collections.EMPTY_LIST);
expectStart(Collections.emptyList());
expectStop();
PowerMock.replayAll();
@ -166,18 +166,15 @@ public class KafkaOffsetBackingStoreTest {
@Test
public void testGetSet() throws Exception {
expectConfigure();
expectStart(Collections.EMPTY_LIST);
expectStart(Collections.emptyList());
expectStop();
// First get() against an empty store
final Capture<Callback<Void>> firstGetReadToEndCallback = EasyMock.newCapture();
storeLog.readToEnd(EasyMock.capture(firstGetReadToEndCallback));
PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
@Override
public Object answer() throws Throwable {
PowerMock.expectLastCall().andAnswer(() -> {
firstGetReadToEndCallback.getValue().onCompletion(null, null);
return null;
}
});
// Set offsets
@ -284,7 +281,7 @@ public class KafkaOffsetBackingStoreTest {
@Test
public void testGetSetNull() throws Exception {
expectConfigure();
expectStart(Collections.EMPTY_LIST);
expectStart(Collections.emptyList());
// Set offsets
Capture<org.apache.kafka.clients.producer.Callback> callback0 = EasyMock.newCapture();
@ -297,14 +294,11 @@ public class KafkaOffsetBackingStoreTest {
// Second get() should get the produced data and return the new values
final Capture<Callback<Void>> secondGetReadToEndCallback = EasyMock.newCapture();
storeLog.readToEnd(EasyMock.capture(secondGetReadToEndCallback));
PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
@Override
public Object answer() throws Throwable {
PowerMock.expectLastCall().andAnswer(() -> {
capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (byte[]) null, TP0_VALUE.array()));
capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY.array(), (byte[]) null));
secondGetReadToEndCallback.getValue().onCompletion(null, null);
return null;
}
});
expectStop();
@ -354,7 +348,7 @@ public class KafkaOffsetBackingStoreTest {
@Test
public void testSetFailure() throws Exception {
expectConfigure();
expectStart(Collections.EMPTY_LIST);
expectStart(Collections.emptyList());
expectStop();
// Set offsets

View File

@ -131,6 +131,7 @@ public class KafkaBasedLogTest {
}
};
@SuppressWarnings("unchecked")
@Before
public void setUp() {
store = PowerMock.createPartialMock(KafkaBasedLog.class, new String[]{"createConsumer", "createProducer"},

View File

@ -55,6 +55,7 @@ public abstract class ReplaceField<R extends ConnectRecord<R>> implements Transf
.define(ConfigName.WHITELIST, ConfigDef.Type.LIST, Collections.emptyList(), ConfigDef.Importance.MEDIUM,
"Fields to include. If specified, only these fields will be used.")
.define(ConfigName.RENAME, ConfigDef.Type.LIST, Collections.emptyList(), new ConfigDef.Validator() {
@SuppressWarnings("unchecked")
@Override
public void ensureValid(String name, Object value) {
parseRenameMappings((List<String>) value);
@ -83,7 +84,7 @@ public abstract class ReplaceField<R extends ConnectRecord<R>> implements Transf
renames = parseRenameMappings(config.getList(ConfigName.RENAME));
reverseRenames = invert(renames);
schemaUpdateCache = new SynchronizedCache<>(new LRUCache<Schema, Schema>(16));
schemaUpdateCache = new SynchronizedCache<>(new LRUCache<>(16));
}
static Map<String, String> parseRenameMappings(List<String> mappings) {

View File

@ -141,6 +141,7 @@ public class FlattenTest {
assertNull(transformed.valueSchema());
assertTrue(transformed.value() instanceof Map);
@SuppressWarnings("unchecked")
Map<String, Object> transformedMap = (Map<String, Object>) transformed.value();
assertEquals(9, transformedMap.size());
assertEquals((byte) 8, transformedMap.get("A#B#int8"));
@ -196,6 +197,7 @@ public class FlattenTest {
assertNull(transformed.valueSchema());
assertTrue(transformed.value() instanceof Map);
@SuppressWarnings("unchecked")
Map<String, Object> transformedMap = (Map<String, Object>) transformed.value();
assertNull(transformedMap.get("B.opt_int32"));
@ -211,6 +213,7 @@ public class FlattenTest {
assertNull(transformed.keySchema());
assertTrue(transformed.key() instanceof Map);
@SuppressWarnings("unchecked")
Map<String, Object> transformedMap = (Map<String, Object>) transformed.key();
assertEquals(12, transformedMap.get("A.B"));
}

View File

@ -71,6 +71,7 @@ public class MaskFieldTest {
final List<String> maskFields = new ArrayList<>(value.keySet());
maskFields.remove("magic");
@SuppressWarnings("unchecked")
final Map<String, Object> updatedValue = (Map) transform(maskFields).apply(record(null, value)).value();
assertEquals(42, updatedValue.get("magic"));

View File

@ -33,6 +33,7 @@ import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceP
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.io.StdIn
object AclCommand extends Logging {
@ -449,7 +450,7 @@ object AclCommand extends Logging {
if (opts.options.has(opts.forceOpt))
return true
println(msg)
Console.readLine().equalsIgnoreCase("y")
StdIn.readLine().equalsIgnoreCase("y")
}
private def validateOperation(opts: AclCommandOptions, resourceToAcls: Map[ResourcePatternFilter, Set[Acl]]): Unit = {

View File

@ -36,7 +36,7 @@ import org.apache.kafka.common.requests.DescribeGroupsResponse.GroupMetadata
import org.apache.kafka.common.requests.OffsetFetchResponse
import org.apache.kafka.common.utils.LogContext
import org.apache.kafka.common.utils.{KafkaThread, Time, Utils}
import org.apache.kafka.common.{Cluster, Node, TopicPartition}
import org.apache.kafka.common.{Node, TopicPartition}
import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}

View File

@ -376,6 +376,7 @@ object AdminUtils extends Logging with AdminUtilities {
}
}
@deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
def topicExists(zkUtils: ZkUtils, topic: String): Boolean =
zkUtils.pathExists(getTopicPath(topic))
@ -467,6 +468,7 @@ object AdminUtils extends Logging with AdminUtilities {
writeTopicPartitionAssignment(zkUtils, topic, partitionReplicaAssignment, update)
}
@deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
private def writeTopicPartitionAssignment(zkUtils: ZkUtils, topic: String, replicaAssignment: Map[Int, Seq[Int]], update: Boolean) {
try {
val zkPath = getTopicPath(topic)
@ -523,6 +525,7 @@ object AdminUtils extends Logging with AdminUtilities {
changeEntityConfig(zkUtils, ConfigType.User, sanitizedEntityName, configs)
}
@deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
def validateTopicConfig(zkUtils: ZkUtils, topic: String, configs: Properties): Unit = {
Topic.validate(topic)
if (!topicExists(zkUtils, topic))
@ -562,6 +565,7 @@ object AdminUtils extends Logging with AdminUtilities {
}
}
@deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
private def changeEntityConfig(zkUtils: ZkUtils, rootEntityType: String, fullSanitizedEntityName: String, configs: Properties) {
val sanitizedEntityPath = rootEntityType + '/' + fullSanitizedEntityName
val entityConfigPath = getEntityConfigPath(rootEntityType, fullSanitizedEntityName)
@ -574,13 +578,14 @@ object AdminUtils extends Logging with AdminUtilities {
zkUtils.createSequentialPersistentPath(seqNode, content)
}
def getConfigChangeZnodeData(sanitizedEntityPath: String) : Map[String, Any] = {
def getConfigChangeZnodeData(sanitizedEntityPath: String): Map[String, Any] = {
Map("version" -> 2, "entity_path" -> sanitizedEntityPath)
}
/**
* Write out the entity config to zk, if there is any
*/
@deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
private def writeEntityConfig(zkUtils: ZkUtils, entityPath: String, config: Properties) {
val map = Map("version" -> 1, "config" -> config.asScala)
zkUtils.updatePersistentPath(entityPath, Json.legacyEncodeAsString(map))

View File

@ -36,6 +36,7 @@ import org.apache.kafka.common.TopicPartition
import scala.collection.JavaConverters._
import scala.collection._
import scala.io.StdIn
object TopicCommand extends Logging {
@ -390,7 +391,7 @@ object TopicCommand extends Logging {
def askToProceed(): Unit = {
println("Are you sure you want to continue? [y/n]")
if (!Console.readLine().equalsIgnoreCase("y")) {
if (!StdIn.readLine().equalsIgnoreCase("y")) {
println("Ending your session")
Exit.exit(0)
}

View File

@ -31,7 +31,7 @@ class ControllerContext {
var epoch: Int = KafkaController.InitialControllerEpoch
var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion
var allTopics: Set[String] = Set.empty
private var partitionReplicaAssignmentUnderlying: mutable.Map[String, mutable.Map[Int, Seq[Int]]] = mutable.Map.empty
private val partitionReplicaAssignmentUnderlying: mutable.Map[String, mutable.Map[Int, Seq[Int]]] = mutable.Map.empty
val partitionLeadershipInfo: mutable.Map[TopicPartition, LeaderIsrAndControllerEpoch] = mutable.Map.empty
val partitionsBeingReassigned: mutable.Map[TopicPartition, ReassignedPartitionsContext] = mutable.Map.empty
val replicasOnOfflineDirs: mutable.Map[Int, Set[TopicPartition]] = mutable.Map.empty

View File

@ -274,7 +274,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
info("starting the token expiry check scheduler")
tokenCleanScheduler.startup()
tokenCleanScheduler.schedule(name = "delete-expired-tokens",
fun = tokenManager.expireTokens,
fun = () => tokenManager.expireTokens,
period = config.delegationTokenExpiryCheckIntervalMs,
unit = TimeUnit.MILLISECONDS)
}

View File

@ -140,7 +140,7 @@ class GroupMetadataManager(brokerId: Int,
scheduler.startup()
if (enableMetadataExpiration) {
scheduler.schedule(name = "delete-expired-group-metadata",
fun = cleanupGroupMetadata,
fun = () => cleanupGroupMetadata,
period = config.offsetsRetentionCheckIntervalMs,
unit = TimeUnit.MILLISECONDS)
}
@ -691,7 +691,7 @@ class GroupMetadataManager(brokerId: Int,
onGroupUnloaded: GroupMetadata => Unit) {
val topicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
info(s"Scheduling unloading of offsets and group metadata from $topicPartition")
scheduler.schedule(topicPartition.toString, removeGroupsAndOffsets)
scheduler.schedule(topicPartition.toString, () => removeGroupsAndOffsets)
def removeGroupsAndOffsets() {
var numOffsetsRemoved = 0

View File

@ -497,7 +497,7 @@ class TransactionCoordinator(brokerId: Int,
info("Starting up.")
scheduler.startup()
scheduler.schedule("transaction-abort",
abortTimedOutTransactions,
() => abortTimedOutTransactions,
txnConfig.abortTimedOutTransactionsIntervalMs,
txnConfig.abortTimedOutTransactionsIntervalMs
)

View File

@ -423,7 +423,7 @@ class TransactionStateManager(brokerId: Int,
}
}
scheduler.schedule(s"load-txns-for-partition-$topicPartition", loadTransactions)
scheduler.schedule(s"load-txns-for-partition-$topicPartition", () => loadTransactions)
}
/**
@ -458,7 +458,7 @@ class TransactionStateManager(brokerId: Int,
}
}
scheduler.schedule(s"remove-txns-for-partition-$topicPartition", removeTransactions)
scheduler.schedule(s"remove-txns-for-partition-$topicPartition", () => removeTransactions)
}
private def validateTransactionTopicPartitionCountIsStable(): Unit = {

View File

@ -335,7 +335,7 @@ class Log(@volatile var dir: File,
/** The name of this log */
def name = dir.getName()
def leaderEpochCache = _leaderEpochCache
def leaderEpochCache: LeaderEpochFileCache = _leaderEpochCache
private def initializeLeaderEpochCache(): LeaderEpochFileCache = {
// create the log directory if it doesn't exist

View File

@ -354,7 +354,7 @@ class LogCleaner(initialConfig: CleanerConfig,
case _: LogCleaningAbortedException => // task can be aborted, let it go.
case _: KafkaStorageException => // partition is already offline. let it go.
case e: IOException =>
var logDirectory = cleanable.log.dir.getParent
val logDirectory = cleanable.log.dir.getParent
val msg = s"Failed to clean up log for ${cleanable.topicPartition} in dir ${logDirectory} due to IOException"
logDirFailureChannel.maybeAddOfflineLogDir(logDirectory, msg, e)
} finally {

View File

@ -75,9 +75,6 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
* for each log directory */
private val uncleanablePartitions = mutable.HashMap[String, mutable.Set[TopicPartition]]()
/* the set of directories marked as uncleanable and therefore offline */
private val uncleanableDirs = mutable.HashSet[String]()
/* a global lock used to control all access to the in-progress set and the offset checkpoints */
private val lock = new ReentrantLock

View File

@ -93,7 +93,6 @@ abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val name: Stri
// Visible for testing
private[server] def getFetcher(topicPartition: TopicPartition): Option[T] = {
lock synchronized {
val fetcherId = getFetcherId(topicPartition)
fetcherThreadMap.values.find { fetcherThread =>
fetcherThread.fetchState(topicPartition).isDefined
}

View File

@ -142,8 +142,8 @@ abstract class AbstractFetcherThread(name: String,
* on latest epochs of the future replicas (the one that is fetching)
*/
private def buildLeaderEpochRequest(): ResultWithPartitions[Map[TopicPartition, EpochData]] = inLock(partitionMapLock) {
var partitionsWithoutEpochs = mutable.Set.empty[TopicPartition]
var partitionsWithEpochs = mutable.Map.empty[TopicPartition, EpochData]
val partitionsWithoutEpochs = mutable.Set.empty[TopicPartition]
val partitionsWithEpochs = mutable.Map.empty[TopicPartition, EpochData]
partitionStates.partitionStates.asScala.foreach { state =>
val tp = state.topicPartition

View File

@ -767,7 +767,7 @@ class KafkaApis(val requestChannel: RequestChannel,
maxNumOffsets = partitionData.maxNumOffsets,
isFromConsumer = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID,
fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID)
(topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, offsets.map(new JLong(_)).asJava))
(topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, offsets.map(JLong.valueOf).asJava))
} catch {
// NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are special cased since these error messages
// are typically transient and there is no value in logging the entire stack trace for the same

View File

@ -302,7 +302,7 @@ object ConsoleConsumer extends Logging {
val keyDeserializer = options.valueOf(keyDeserializerOpt)
val valueDeserializer = options.valueOf(valueDeserializerOpt)
val isolationLevel = options.valueOf(isolationLevelOpt).toString
val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
val formatter: MessageFormatter = messageFormatterClass.getDeclaredConstructor().newInstance().asInstanceOf[MessageFormatter]
if (keyDeserializer != null && !keyDeserializer.isEmpty) {
formatterArgs.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer)
@ -480,12 +480,14 @@ class DefaultMessageFormatter extends MessageFormatter {
lineSeparator = props.getProperty("line.separator").getBytes(StandardCharsets.UTF_8)
// Note that `toString` will be called on the instance returned by `Deserializer.deserialize`
if (props.containsKey("key.deserializer")) {
keyDeserializer = Some(Class.forName(props.getProperty("key.deserializer")).newInstance().asInstanceOf[Deserializer[_]])
keyDeserializer = Some(Class.forName(props.getProperty("key.deserializer")).getDeclaredConstructor()
.newInstance().asInstanceOf[Deserializer[_]])
keyDeserializer.get.configure(propertiesWithKeyPrefixStripped("key.deserializer.", props).asScala.asJava, true)
}
// Note that `toString` will be called on the instance returned by `Deserializer.deserialize`
if (props.containsKey("value.deserializer")) {
valueDeserializer = Some(Class.forName(props.getProperty("value.deserializer")).newInstance().asInstanceOf[Deserializer[_]])
valueDeserializer = Some(Class.forName(props.getProperty("value.deserializer")).getDeclaredConstructor()
.newInstance().asInstanceOf[Deserializer[_]])
valueDeserializer.get.configure(propertiesWithKeyPrefixStripped("value.deserializer.", props).asScala.asJava, false)
}
}

View File

@ -39,7 +39,7 @@ object ConsoleProducer {
try {
val config = new ProducerConfig(args)
val reader = Class.forName(config.readerClass).newInstance().asInstanceOf[MessageReader]
val reader = Class.forName(config.readerClass).getDeclaredConstructor().newInstance().asInstanceOf[MessageReader]
reader.init(System.in, getReaderProps(config))
val producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps(config))

View File

@ -45,7 +45,7 @@ object Mx4jLoader extends Logging {
val processorName = new ObjectName("Server:name=XSLTProcessor")
val httpAdaptorClass = Class.forName("mx4j.tools.adaptor.http.HttpAdaptor")
val httpAdaptor = httpAdaptorClass.newInstance()
val httpAdaptor = httpAdaptorClass.getDeclaredConstructor().newInstance()
httpAdaptorClass.getMethod("setHost", classOf[String]).invoke(httpAdaptor, address.asInstanceOf[AnyRef])
httpAdaptorClass.getMethod("setPort", Integer.TYPE).invoke(httpAdaptor, port.asInstanceOf[AnyRef])
@ -53,7 +53,7 @@ object Mx4jLoader extends Logging {
mbs.registerMBean(httpAdaptor, httpName)
val xsltProcessorClass = Class.forName("mx4j.tools.adaptor.http.XSLTProcessor")
val xsltProcessor = xsltProcessorClass.newInstance()
val xsltProcessor = xsltProcessorClass.getDeclaredConstructor().newInstance()
httpAdaptorClass.getMethod("setProcessor", Class.forName("mx4j.tools.adaptor.http.ProcessorMBean")).invoke(httpAdaptor, xsltProcessor.asInstanceOf[AnyRef])
mbs.registerMBean(xsltProcessor, processorName)
httpAdaptorClass.getMethod("start").invoke(httpAdaptor)

View File

@ -43,11 +43,7 @@ trait DecodeJson[T] {
def decode(node: JsonNode): T =
decodeEither(node) match {
case Right(x) => x
case Left(x) =>
// Non-deprecated constructors were only introduced in Jackson 2.7, so stick with the deprecated one in case
// people have older versions of Jackson in their classpath. Once the Scala clients are removed, we can loosen
// this restriction.
throw new JsonMappingException(x)
case Left(x) => throw new JsonMappingException(null, x)
}
}

View File

@ -31,7 +31,7 @@ import scala.collection.Iterator
class JsonObject private[json] (protected val node: ObjectNode) extends JsonValue {
def apply(name: String): JsonValue =
get(name).getOrElse(throw new JsonMappingException(s"No such field exists: `$name`"))
get(name).getOrElse(throw new JsonMappingException(null, s"No such field exists: `$name`"))
def get(name: String): Option[JsonValue] = Option(node.get(name)).map(JsonValue(_))

View File

@ -59,7 +59,7 @@ trait JsonValue {
* If this is a JSON object, return an instance of JsonObject. Otherwise, throw a JsonMappingException.
*/
def asJsonObject: JsonObject =
asJsonObjectOption.getOrElse(throw new JsonMappingException(s"Expected JSON object, received $node"))
asJsonObjectOption.getOrElse(throw new JsonMappingException(null, s"Expected JSON object, received $node"))
/**
* If this is a JSON object, return a JsonObject wrapped by a `Some`. Otherwise, return None.
@ -76,7 +76,7 @@ trait JsonValue {
* If this is a JSON array, return an instance of JsonArray. Otherwise, throw a JsonMappingException.
*/
def asJsonArray: JsonArray =
asJsonArrayOption.getOrElse(throw new JsonMappingException(s"Expected JSON array, received $node"))
asJsonArrayOption.getOrElse(throw new JsonMappingException(null, s"Expected JSON array, received $node"))
/**
* If this is a JSON array, return a JsonArray wrapped by a `Some`. Otherwise, return None.

View File

@ -918,26 +918,17 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
val producer = createProducer()
sendRecords(producer, 10, topicPartition)
var messageCount = 0
TestUtils.waitUntilTrue(() => {
messageCount += consumer.poll(0).count
messageCount == 10
}, "Expected 10 messages", 3000L)
TestUtils.consumeRecords(consumer, 10)
client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(3L)).asJava).all.get
consumer.seek(topicPartition, 1)
messageCount = 0
TestUtils.waitUntilTrue(() => {
messageCount += consumer.poll(0).count
messageCount == 7
}, "Expected 7 messages", 3000L)
TestUtils.consumeRecords(consumer, 7)
client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(8L)).asJava).all.get
consumer.seek(topicPartition, 1)
messageCount = 0
TestUtils.waitUntilTrue(() => {
messageCount += consumer.poll(0).count
messageCount == 2
}, "Expected 2 messages", 3000L)
TestUtils.consumeRecords(consumer, 2)
}
@Test
@ -988,10 +979,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
private def subscribeAndWaitForAssignment(topic: String, consumer: KafkaConsumer[Array[Byte], Array[Byte]]): Unit = {
consumer.subscribe(Collections.singletonList(topic))
TestUtils.waitUntilTrue(() => {
consumer.poll(0)
!consumer.assignment.isEmpty
}, "Expected non-empty assignment")
TestUtils.pollUntilTrue(consumer, () => !consumer.assignment.isEmpty, "Expected non-empty assignment")
}
private def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]],
@ -1147,8 +1135,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
consumerThread.start
// Test that we can list the new group.
TestUtils.waitUntilTrue(() => {
val matching = client.listConsumerGroups().all().get().asScala.
filter(listing => listing.groupId().equals(testGroupId))
val matching = client.listConsumerGroups.all.get().asScala.filter(_.groupId == testGroupId)
!matching.isEmpty
}, s"Expected to be able to list $testGroupId")

View File

@ -16,7 +16,7 @@ import java.nio.ByteBuffer
import java.util
import java.util.concurrent.ExecutionException
import java.util.regex.Pattern
import java.util.{ArrayList, Collections, Optional, Properties}
import java.util.{Collections, Optional, Properties}
import java.time.Duration
import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, ConsumerGroupService}
@ -317,7 +317,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
private def createOffsetCommitRequest = {
new requests.OffsetCommitRequest.Builder(
group, Map(tp -> new requests.OffsetCommitRequest.PartitionData(0, 27, "metadata")).asJava).
group, Map(tp -> new requests.OffsetCommitRequest.PartitionData(0L, Optional.empty[Integer](), "metadata")).asJava).
setMemberId("").setGenerationId(1).
build()
}
@ -1516,20 +1516,14 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
startingOffset: Int = 0,
topic: String = topic,
part: Int = part) {
val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]()
TestUtils.waitUntilTrue(() => {
for (record <- consumer.poll(50).asScala)
records.add(record)
records.size == numRecords
}, "Failed to receive all expected records from the consumer")
val records = TestUtils.consumeRecords(consumer, numRecords)
for (i <- 0 until numRecords) {
val record = records.get(i)
val record = records(i)
val offset = startingOffset + i
assertEquals(topic, record.topic())
assertEquals(part, record.partition())
assertEquals(offset.toLong, record.offset())
assertEquals(topic, record.topic)
assertEquals(part, record.partition)
assertEquals(offset.toLong, record.offset)
}
}

View File

@ -306,7 +306,6 @@ abstract class BaseConsumerTest extends IntegrationTestHarness {
val allNonEmptyAssignments = assignments.forall(assignment => assignment.nonEmpty)
if (!allNonEmptyAssignments) {
// at least one consumer got empty assignment
val uniqueAssignedPartitions = (Set[TopicPartition]() /: assignments) (_ ++ _)
return false
}

View File

@ -26,7 +26,7 @@ import kafka.integration.KafkaServerTestHarness
import kafka.log.LogConfig
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.record.TimestampType
@ -34,7 +34,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.junit.Assert._
import org.junit.{After, Before, Test}
import scala.collection.mutable.{ArrayBuffer, Buffer}
import scala.collection.mutable.Buffer
import scala.concurrent.ExecutionException
abstract class BaseProducerSendTest extends KafkaServerTestHarness {
@ -86,15 +86,6 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
producer
}
private def pollUntilNumRecords(numRecords: Int) : Seq[ConsumerRecord[Array[Byte], Array[Byte]]] = {
val records = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]]()
TestUtils.waitUntilTrue(() => {
records ++= consumer.poll(50).asScala
records.size == numRecords
}, s"Consumed ${records.size} records until timeout, but expected $numRecords records.")
records
}
/**
* testSendOffset checks the basic send API behavior
*
@ -329,7 +320,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
consumer.assign(List(new TopicPartition(topic, partition)).asJava)
// make sure the fetched messages also respect the partitioning and ordering
val records = pollUntilNumRecords(numRecords)
val records = TestUtils.consumeRecords(consumer, numRecords)
records.zipWithIndex.foreach { case (record, i) =>
assertEquals(topic, record.topic)
@ -496,7 +487,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
producer.flush()
assertTrue("All requests are complete.", responses.forall(_.isDone()))
// Check the messages received by broker.
pollUntilNumRecords(numRecords)
TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords)
} finally {
producer.close()
}

View File

@ -19,7 +19,8 @@ package kafka.api
import java.util
import kafka.server.KafkaConfig
import kafka.utils.{JaasTestUtils, TestUtils, ZkUtils}
import kafka.utils.{JaasTestUtils, TestUtils}
import kafka.zk.ConfigEntityChangeNotificationZNode
import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.security.auth.SecurityProtocol
@ -47,7 +48,7 @@ class DelegationTokenEndToEndAuthorizationTest extends EndToEndAuthorizationTest
override def configureSecurityBeforeServersStart() {
super.configureSecurityBeforeServersStart()
zkClient.makeSurePersistentPathExists(ZkUtils.ConfigChangesPath)
zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
// Create broker admin credentials before starting brokers
createScramCredentials(zkConnect, kafkaPrincipal, kafkaPassword)
}

View File

@ -21,18 +21,17 @@ import com.yammer.metrics.Metrics
import com.yammer.metrics.core.Gauge
import java.io.File
import java.util.ArrayList
import java.util.concurrent.ExecutionException
import kafka.admin.AclCommand
import kafka.security.auth._
import kafka.server._
import kafka.utils._
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.errors.{GroupAuthorizationException, TimeoutException, TopicAuthorizationException}
import org.apache.kafka.common.errors.{GroupAuthorizationException, TopicAuthorizationException}
import org.apache.kafka.common.resource.PatternType
import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
import org.junit.Assert._
@ -458,22 +457,14 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
topic: String = topic,
part: Int = part,
timeout: Long = 10000) {
val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]()
val deadlineMs = System.currentTimeMillis() + timeout
while (records.size < numRecords && System.currentTimeMillis() < deadlineMs) {
for (record <- consumer.poll(50).asScala)
records.add(record)
}
if (records.size < numRecords)
throw new TimeoutException
val records = TestUtils.consumeRecords(consumer, numRecords, timeout)
for (i <- 0 until numRecords) {
val record = records.get(i)
val record = records(i)
val offset = startingOffset + i
assertEquals(topic, record.topic())
assertEquals(part, record.partition())
assertEquals(offset.toLong, record.offset())
assertEquals(topic, record.topic)
assertEquals(part, record.partition)
assertEquals(offset.toLong, record.offset)
}
}
}

View File

@ -35,8 +35,6 @@ import org.junit.{Before, Test}
import scala.collection.JavaConverters._
import org.apache.kafka.test.TestUtils.isValidClusterId
import scala.collection.mutable.ArrayBuffer
/** The test cases here verify the following conditions.
* 1. The ProducerInterceptor receives the cluster id after the onSend() method is called and before onAcknowledgement() method is called.
* 2. The Serializer receives the cluster id before the serialize() method is called.
@ -203,17 +201,8 @@ class EndToEndClusterIdTest extends KafkaServerTestHarness {
startingOffset: Int = 0,
topic: String = topic,
part: Int = part) {
val records = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]]()
val maxIters = numRecords * 50
var iters = 0
while (records.size < numRecords) {
for (record <- consumer.poll(50).asScala) {
records += record
}
if (iters > maxIters)
throw new IllegalStateException("Failed to consume the expected records after " + iters + " iterations.")
iters += 1
}
val records = TestUtils.consumeRecords(consumer, numRecords)
for (i <- 0 until numRecords) {
val record = records(i)
val offset = startingOffset + i

View File

@ -35,6 +35,8 @@ import scala.collection.JavaConverters._
/**
* Tests for the deprecated Scala AdminClient.
*/
@deprecated("The Scala AdminClient has been deprecated in favour of org.apache.kafka.clients.admin.AdminClient",
since = "0.11.0")
class LegacyAdminClientTest extends IntegrationTestHarness with Logging {
val producerCount = 1

View File

@ -21,15 +21,11 @@ import java.util.concurrent.TimeUnit
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.record.TimestampType
import org.junit.{Before, Test}
import org.junit.Assert.{assertEquals, assertNotEquals, assertTrue}
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
/**
* Tests where the broker is configured to use LogAppendTime. For tests where LogAppendTime is configured via topic
* level configs, see the *ProducerSendTest classes.
@ -66,11 +62,7 @@ class LogAppendTimeTest extends IntegrationTestHarness {
val consumer = createConsumer()
consumer.subscribe(Collections.singleton(topic))
val consumerRecords = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]]
TestUtils.waitUntilTrue(() => {
consumerRecords ++= consumer.poll(50).asScala
consumerRecords.size == producerRecords.size
}, s"Consumed ${consumerRecords.size} records until timeout instead of the expected ${producerRecords.size} records")
val consumerRecords = TestUtils.consumeRecords(consumer, producerRecords.size)
consumerRecords.zipWithIndex.foreach { case (consumerRecord, index) =>
val producerRecord = producerRecords(index)

View File

@ -126,6 +126,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
assertEquals(numRecords, records.size)
}
@deprecated("poll(Duration) is the replacement", since = "2.0")
@Test
def testDeprecatedPollBlocksForAssignment(): Unit = {
val consumer = createConsumer()
@ -134,6 +135,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
assertEquals(Set(tp, tp2), consumer.assignment().asScala)
}
@deprecated("Serializer now includes a default method that provides the headers", since = "2.1")
@Test
def testHeadersExtendedSerializerDeserializer(): Unit = {
val extendedSerializer = new ExtendedSerializer[Array[Byte]] with SerializerImpl
@ -1522,7 +1524,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testPerPartitionLagMetricsCleanUpWithAssign")
val consumer = createConsumer()
consumer.assign(List(tp).asJava)
val records = awaitNonEmptyRecords(consumer, tp)
awaitNonEmptyRecords(consumer, tp)
// Verify the metric exist.
val tags = new util.HashMap[String, String]()
tags.put("client-id", "testPerPartitionLagMetricsCleanUpWithAssign")

View File

@ -37,7 +37,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
val producer = registerProducer(new KafkaProducer(producerProps))
val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, new Integer(0), "key".getBytes, "value".getBytes)
val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, "key".getBytes, "value".getBytes)
producer.send(record)
}

View File

@ -22,6 +22,7 @@ import java.io.{Closeable, File, FileWriter}
import java.nio.file.{Files, Paths, StandardCopyOption}
import java.lang.management.ManagementFactory
import java.security.KeyStore
import java.time.Duration
import java.util
import java.util.{Collections, Properties}
import java.util.concurrent._
@ -147,7 +148,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
clientThreads.foreach(_.join(5 * 1000))
executors.foreach(_.shutdownNow())
producers.foreach(_.close(0, TimeUnit.MILLISECONDS))
consumers.foreach(_.close(0, TimeUnit.MILLISECONDS))
consumers.foreach(_.close(Duration.ofMillis(0)))
adminClients.foreach(_.close())
TestUtils.shutdownServers(servers)
super.tearDown()
@ -993,10 +994,8 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
}
private def awaitInitialPositions(consumer: KafkaConsumer[_, _]): Unit = {
do {
consumer.poll(1)
} while (consumer.assignment.isEmpty)
consumer.assignment.asScala.foreach(tp => consumer.position(tp))
TestUtils.pollUntilTrue(consumer, () => !consumer.assignment.isEmpty, "Timed out while waiting for assignment")
consumer.assignment.asScala.foreach(consumer.position)
}
private def clientProps(securityProtocol: SecurityProtocol, saslMechanism: Option[String] = None): Properties = {
@ -1025,12 +1024,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
topic: String): Unit = {
val producerRecords = (1 to numRecords).map(i => new ProducerRecord(topic, s"key$i", s"value$i"))
producerRecords.map(producer.send).map(_.get(10, TimeUnit.SECONDS))
var received = 0
TestUtils.waitUntilTrue(() => {
received += consumer.poll(50).count
received >= numRecords
}, s"Consumed $received records until timeout instead of the expected $numRecords records")
assertEquals(numRecords, received)
TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords)
}
private def verifyAuthenticationFailure(producer: KafkaProducer[_, _]): Unit = {

View File

@ -28,7 +28,7 @@ import kafka.utils.JaasTestUtils.JaasSection
import kafka.utils.{JaasTestUtils, TestUtils}
import kafka.utils.Implicits._
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.config.SslConfigs
import org.apache.kafka.common.internals.Topic
@ -38,7 +38,6 @@ import org.junit.{After, Before, Test}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConverters._
object MultipleListenersWithSameSecurityProtocolBaseTest {
val SecureInternal = "SECURE_INTERNAL"
@ -169,11 +168,7 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends ZooKeep
val consumer = consumers(clientMetadata)
consumer.subscribe(Collections.singleton(clientMetadata.topic))
val records = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]]
TestUtils.waitUntilTrue(() => {
records ++= consumer.poll(50).asScala
records.size == producerRecords.size
}, s"Consumed ${records.size} records until timeout instead of the expected ${producerRecords.size} records with mechanism ${clientMetadata.saslMechanism}")
TestUtils.consumeRecords(consumer, producerRecords.size)
}
}

View File

@ -16,13 +16,14 @@
*/
package kafka.common
import java.util
import kafka.utils.MockTime
import org.apache.kafka.clients.{ClientRequest, ClientResponse, NetworkClient, RequestCompletionHandler}
import org.apache.kafka.common.Node
import org.apache.kafka.common.errors.AuthenticationException
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.AbstractRequest
import org.apache.kafka.common.utils.Utils
import org.easymock.EasyMock
import org.junit.{Assert, Test}
@ -43,7 +44,7 @@ class InterBrokerSendThreadTest {
// poll is always called but there should be no further invocations on NetworkClient
EasyMock.expect(networkClient.poll(EasyMock.anyLong(), EasyMock.anyLong()))
.andReturn(Utils.mkList())
.andReturn(new util.ArrayList())
EasyMock.replay(networkClient)
@ -80,7 +81,7 @@ class InterBrokerSendThreadTest {
EasyMock.expect(networkClient.send(clientRequest, time.milliseconds()))
EasyMock.expect(networkClient.poll(EasyMock.anyLong(), EasyMock.anyLong()))
.andReturn(Utils.mkList())
.andReturn(new util.ArrayList())
EasyMock.replay(networkClient)
@ -118,7 +119,7 @@ class InterBrokerSendThreadTest {
.andReturn(0)
EasyMock.expect(networkClient.poll(EasyMock.anyLong(), EasyMock.anyLong()))
.andReturn(Utils.mkList())
.andReturn(new util.ArrayList())
EasyMock.expect(networkClient.connectionFailed(node))
.andReturn(true)
@ -164,7 +165,7 @@ class InterBrokerSendThreadTest {
.andReturn(0)
EasyMock.expect(networkClient.poll(EasyMock.anyLong(), EasyMock.anyLong()))
.andReturn(Utils.mkList())
.andReturn(new util.ArrayList())
// rule out disconnects so the request stays for the expiry check
EasyMock.expect(networkClient.connectionFailed(node))

View File

@ -19,6 +19,7 @@ package kafka.admin
import java.util.Properties
import kafka.utils.ZkUtils
@deprecated("This class is deprecated since AdminUtilities will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
class TestAdminUtils extends AdminUtilities {
override def changeBrokerConfig(zkUtils: ZkUtils, brokerIds: Seq[Int], configs: Properties): Unit = {}
override def fetchEntityConfig(zkUtils: ZkUtils, entityType: String, entityName: String): Properties = {new Properties}

View File

@ -16,6 +16,7 @@
*/
package kafka.coordinator.transaction
import java.util.Arrays.asList
import java.util.concurrent.locks.ReentrantReadWriteLock
import kafka.server.{DelayedOperationPurgatory, KafkaConfig, MetadataCache}
@ -23,7 +24,7 @@ import kafka.utils.timer.MockTimer
import kafka.utils.TestUtils
import org.apache.kafka.clients.{ClientResponse, NetworkClient}
import org.apache.kafka.common.requests.{RequestHeader, TransactionResult, WriteTxnMarkersRequest, WriteTxnMarkersResponse}
import org.apache.kafka.common.utils.{MockTime, Utils}
import org.apache.kafka.common.utils.MockTime
import org.apache.kafka.common.{Node, TopicPartition}
import org.easymock.{Capture, EasyMock, IAnswer}
import org.junit.Assert._
@ -129,10 +130,10 @@ class TransactionMarkerChannelManagerTest {
assertEquals(0, channelManager.queueForBroker(broker2.id).get.totalNumMarkers(txnTopicPartition2))
val expectedBroker1Request = new WriteTxnMarkersRequest.Builder(
Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch, coordinatorEpoch, txnResult, Utils.mkList(partition1)),
new WriteTxnMarkersRequest.TxnMarkerEntry(producerId2, producerEpoch, coordinatorEpoch, txnResult, Utils.mkList(partition1)))).build()
asList(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch, coordinatorEpoch, txnResult, asList(partition1)),
new WriteTxnMarkersRequest.TxnMarkerEntry(producerId2, producerEpoch, coordinatorEpoch, txnResult, asList(partition1)))).build()
val expectedBroker2Request = new WriteTxnMarkersRequest.Builder(
Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch, coordinatorEpoch, txnResult, Utils.mkList(partition2)))).build()
asList(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch, coordinatorEpoch, txnResult, asList(partition2)))).build()
val requests: Map[Node, WriteTxnMarkersRequest] = channelManager.generateRequests().map { handler =>
(handler.destination, handler.request.asInstanceOf[WriteTxnMarkersRequest.Builder].build())
@ -206,10 +207,10 @@ class TransactionMarkerChannelManagerTest {
assertEquals(1, channelManager.queueForUnknownBroker.totalNumMarkers(txnTopicPartition2))
val expectedBroker1Request = new WriteTxnMarkersRequest.Builder(
Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch, coordinatorEpoch, txnResult, Utils.mkList(partition1)),
new WriteTxnMarkersRequest.TxnMarkerEntry(producerId2, producerEpoch, coordinatorEpoch, txnResult, Utils.mkList(partition1)))).build()
asList(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch, coordinatorEpoch, txnResult, asList(partition1)),
new WriteTxnMarkersRequest.TxnMarkerEntry(producerId2, producerEpoch, coordinatorEpoch, txnResult, asList(partition1)))).build()
val expectedBroker2Request = new WriteTxnMarkersRequest.Builder(
Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch, coordinatorEpoch, txnResult, Utils.mkList(partition2)))).build()
asList(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch, coordinatorEpoch, txnResult, asList(partition2)))).build()
val firstDrainedRequests: Map[Node, WriteTxnMarkersRequest] = channelManager.generateRequests().map { handler =>
(handler.destination, handler.request.asInstanceOf[WriteTxnMarkersRequest.Builder].build())

View File

@ -17,12 +17,12 @@
package kafka.coordinator.transaction
import java.{lang, util}
import java.util.Arrays.asList
import org.apache.kafka.clients.ClientResponse
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{RequestHeader, TransactionResult, WriteTxnMarkersRequest, WriteTxnMarkersResponse}
import org.apache.kafka.common.utils.Utils
import org.easymock.{EasyMock, IAnswer}
import org.junit.Assert._
import org.junit.Test
@ -40,9 +40,8 @@ class TransactionMarkerRequestCompletionHandlerTest {
private val coordinatorEpoch = 0
private val txnResult = TransactionResult.COMMIT
private val topicPartition = new TopicPartition("topic1", 0)
private val txnIdAndMarkers =
Utils.mkList(
TxnIdAndMarkerEntry(transactionalId, new WriteTxnMarkersRequest.TxnMarkerEntry(producerId, producerEpoch, coordinatorEpoch, txnResult, Utils.mkList(topicPartition))))
private val txnIdAndMarkers = asList(
TxnIdAndMarkerEntry(transactionalId, new WriteTxnMarkersRequest.TxnMarkerEntry(producerId, producerEpoch, coordinatorEpoch, txnResult, asList(topicPartition))))
private val txnMetadata = new TransactionMetadata(transactionalId, producerId, producerEpoch, txnTimeoutMs,
PrepareCommit, mutable.Set[TopicPartition](topicPartition), 0L, 0L)

View File

@ -64,22 +64,22 @@ class FetchSessionTest {
def testSessionCache(): Unit = {
val cache = new FetchSessionCache(3, 100)
assertEquals(0, cache.size)
val id1 = cache.maybeCreateSession(0, false, 10, dummyCreate(10))
val id2 = cache.maybeCreateSession(10, false, 20, dummyCreate(20))
val id3 = cache.maybeCreateSession(20, false, 30, dummyCreate(30))
assertEquals(INVALID_SESSION_ID, cache.maybeCreateSession(30, false, 40, dummyCreate(40)))
assertEquals(INVALID_SESSION_ID, cache.maybeCreateSession(40, false, 5, dummyCreate(5)))
val id1 = cache.maybeCreateSession(0, false, 10, () => dummyCreate(10))
val id2 = cache.maybeCreateSession(10, false, 20, () => dummyCreate(20))
val id3 = cache.maybeCreateSession(20, false, 30, () => dummyCreate(30))
assertEquals(INVALID_SESSION_ID, cache.maybeCreateSession(30, false, 40, () => dummyCreate(40)))
assertEquals(INVALID_SESSION_ID, cache.maybeCreateSession(40, false, 5, () => dummyCreate(5)))
assertCacheContains(cache, id1, id2, id3)
cache.touch(cache.get(id1).get, 200)
val id4 = cache.maybeCreateSession(210, false, 11, dummyCreate(11))
val id4 = cache.maybeCreateSession(210, false, 11, () => dummyCreate(11))
assertCacheContains(cache, id1, id3, id4)
cache.touch(cache.get(id1).get, 400)
cache.touch(cache.get(id3).get, 390)
cache.touch(cache.get(id4).get, 400)
val id5 = cache.maybeCreateSession(410, false, 50, dummyCreate(50))
val id5 = cache.maybeCreateSession(410, false, 50, () => dummyCreate(50))
assertCacheContains(cache, id3, id4, id5)
assertEquals(INVALID_SESSION_ID, cache.maybeCreateSession(410, false, 5, dummyCreate(5)))
val id6 = cache.maybeCreateSession(410, true, 5, dummyCreate(5))
assertEquals(INVALID_SESSION_ID, cache.maybeCreateSession(410, false, 5, () => dummyCreate(5)))
val id6 = cache.maybeCreateSession(410, true, 5, () => dummyCreate(5))
assertCacheContains(cache, id3, id5, id6)
}
@ -89,7 +89,7 @@ class FetchSessionTest {
assertEquals(0, cache.totalPartitions)
assertEquals(0, cache.size)
assertEquals(0, cache.evictionsMeter.count)
val id1 = cache.maybeCreateSession(0, false, 2, dummyCreate(2))
val id1 = cache.maybeCreateSession(0, false, 2, () => dummyCreate(2))
assertTrue(id1 > 0)
assertCacheContains(cache, id1)
val session1 = cache.get(id1).get
@ -97,7 +97,7 @@ class FetchSessionTest {
assertEquals(2, cache.totalPartitions)
assertEquals(1, cache.size)
assertEquals(0, cache.evictionsMeter.count)
val id2 = cache.maybeCreateSession(0, false, 4, dummyCreate(4))
val id2 = cache.maybeCreateSession(0, false, 4, () => dummyCreate(4))
val session2 = cache.get(id2).get
assertTrue(id2 > 0)
assertCacheContains(cache, id1, id2)
@ -106,7 +106,7 @@ class FetchSessionTest {
assertEquals(0, cache.evictionsMeter.count)
cache.touch(session1, 200)
cache.touch(session2, 200)
val id3 = cache.maybeCreateSession(200, false, 5, dummyCreate(5))
val id3 = cache.maybeCreateSession(200, false, 5, () => dummyCreate(5))
assertTrue(id3 > 0)
assertCacheContains(cache, id2, id3)
assertEquals(9, cache.totalPartitions)

Some files were not shown because too many files have changed in this diff Show More