mirror of https://github.com/apache/kafka.git
KAFKA-12579: Remove various deprecated clients classes/methods for 3.0 (#10438)
* Remove `ExtendedSerializer` and `ExtendedDeserializer`, deprecated since 2.1. The extra functionality was also made available in `Serializer` and `Deserializer`. * Remove `close(long, TimeUnit)` from the producer, consumer and admin client, deprecated since 2.0 for the consumer and 2.2 for the rest. The replacement is `close(Duration)`. * Remove `ConsumerConfig.addDeserializerToConfig` and `ProducerConfig.addSerializerToConfig`, deprecated since 2.7 with no replacement. These methods were not intended to be public API and are likely not used much (if at all). * Remove `NoOffsetForPartitionException.partition()`, deprecated since 0.11. `partitions()` should be used instead. * Remove `MessageFormatter.init(Properties)`, deprecated since 2.7. The `configure(Map)` method should be used instead. * Remove `kafka.common.MessageFormatter`, deprecated since 2.7. `org.apache.kafka.common.MessageFormatter` should be used instead. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, David Jacot <djacot@confluent.io>
This commit is contained in:
parent
267b73616d
commit
2f36001987
|
@ -24,7 +24,6 @@ import java.util.Map;
|
|||
import java.util.Optional;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
||||
import org.apache.kafka.common.ElectionType;
|
||||
|
@ -147,28 +146,11 @@ public interface Admin extends AutoCloseable {
|
|||
/**
|
||||
* Close the Admin and release all associated resources.
|
||||
* <p>
|
||||
* See {@link #close(long, TimeUnit)}
|
||||
* See {@link #close(Duration)}
|
||||
*/
|
||||
@Override
|
||||
default void close() {
|
||||
close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Close the Admin and release all associated resources.
|
||||
* <p>
|
||||
* The close operation has a grace period during which current operations will be allowed to
|
||||
* complete, specified by the given duration and time unit.
|
||||
* New operations will not be accepted during the grace period. Once the grace period is over,
|
||||
* all operations that have not yet been completed will be aborted with a {@link org.apache.kafka.common.errors.TimeoutException}.
|
||||
*
|
||||
* @param duration The duration to use for the wait time.
|
||||
* @param unit The time unit to use for the wait time.
|
||||
* @deprecated Since 2.2. Use {@link #close(Duration)} or {@link #close()}.
|
||||
*/
|
||||
@Deprecated
|
||||
default void close(long duration, TimeUnit unit) {
|
||||
close(Duration.ofMillis(unit.toMillis(duration)));
|
||||
close(Duration.ofMillis(Long.MAX_VALUE));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -28,7 +28,6 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.OptionalLong;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
/**
|
||||
|
@ -264,12 +263,6 @@ public interface Consumer<K, V> extends Closeable {
|
|||
*/
|
||||
void close();
|
||||
|
||||
/**
|
||||
* @see KafkaConsumer#close(long, TimeUnit)
|
||||
*/
|
||||
@Deprecated
|
||||
void close(long timeout, TimeUnit unit);
|
||||
|
||||
/**
|
||||
* @see KafkaConsumer#close(Duration)
|
||||
*/
|
||||
|
|
|
@ -578,19 +578,9 @@ public class ConsumerConfig extends AbstractConfig {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated Since 2.7.0. This will be removed in a future major release.
|
||||
*/
|
||||
@Deprecated
|
||||
public static Map<String, Object> addDeserializerToConfig(Map<String, Object> configs,
|
||||
Deserializer<?> keyDeserializer,
|
||||
Deserializer<?> valueDeserializer) {
|
||||
return appendDeserializerToConfig(configs, keyDeserializer, valueDeserializer);
|
||||
}
|
||||
|
||||
static Map<String, Object> appendDeserializerToConfig(Map<String, Object> configs,
|
||||
Deserializer<?> keyDeserializer,
|
||||
Deserializer<?> valueDeserializer) {
|
||||
protected static Map<String, Object> appendDeserializerToConfig(Map<String, Object> configs,
|
||||
Deserializer<?> keyDeserializer,
|
||||
Deserializer<?> valueDeserializer) {
|
||||
Map<String, Object> newConfigs = new HashMap<>(configs);
|
||||
if (keyDeserializer != null)
|
||||
newConfigs.put(KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass());
|
||||
|
@ -599,22 +589,6 @@ public class ConsumerConfig extends AbstractConfig {
|
|||
return newConfigs;
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated Since 2.7.0. This will be removed in a future major release.
|
||||
*/
|
||||
@Deprecated
|
||||
public static Properties addDeserializerToConfig(Properties properties,
|
||||
Deserializer<?> keyDeserializer,
|
||||
Deserializer<?> valueDeserializer) {
|
||||
Properties newProperties = new Properties();
|
||||
newProperties.putAll(properties);
|
||||
if (keyDeserializer != null)
|
||||
newProperties.put(KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
|
||||
if (valueDeserializer != null)
|
||||
newProperties.put(VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());
|
||||
return newProperties;
|
||||
}
|
||||
|
||||
boolean maybeOverrideEnableAutoCommit() {
|
||||
Optional<String> groupId = Optional.ofNullable(getString(CommonClientConfigs.GROUP_ID_CONFIG));
|
||||
boolean enableAutoCommit = getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
|
||||
|
|
|
@ -2308,29 +2308,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
|
|||
close(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS));
|
||||
}
|
||||
|
||||
/**
|
||||
* Tries to close the consumer cleanly within the specified timeout. This method waits up to
|
||||
* {@code timeout} for the consumer to complete pending commits and leave the group.
|
||||
* If auto-commit is enabled, this will commit the current offsets if possible within the
|
||||
* timeout. If the consumer is unable to complete offset commits and gracefully leave the group
|
||||
* before the timeout expires, the consumer is force closed. Note that {@link #wakeup()} cannot be
|
||||
* used to interrupt close.
|
||||
*
|
||||
* @param timeout The maximum time to wait for consumer to close gracefully. The value must be
|
||||
* non-negative. Specifying a timeout of zero means do not wait for pending requests to complete.
|
||||
* @param timeUnit The time unit for the {@code timeout}
|
||||
* @throws IllegalArgumentException If the {@code timeout} is negative.
|
||||
* @throws InterruptException If the thread is interrupted before or while this function is called
|
||||
* @throws org.apache.kafka.common.KafkaException for any other error during close
|
||||
*
|
||||
* @deprecated Since 2.0. Use {@link #close(Duration)} or {@link #close()}.
|
||||
*/
|
||||
@Deprecated
|
||||
@Override
|
||||
public void close(long timeout, TimeUnit timeUnit) {
|
||||
close(Duration.ofMillis(timeUnit.toMillis(timeout)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Tries to close the consumer cleanly within the specified timeout. This method waits up to
|
||||
* {@code timeout} for the consumer to complete pending commits and leave the group.
|
||||
|
|
|
@ -40,7 +40,6 @@ import java.util.Optional;
|
|||
import java.util.OptionalLong;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
|
@ -447,13 +446,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
|
|||
}
|
||||
|
||||
@Override
|
||||
public synchronized void close() {
|
||||
close(KafkaConsumer.DEFAULT_CLOSE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
@Override
|
||||
public synchronized void close(long timeout, TimeUnit unit) {
|
||||
public final synchronized void close() {
|
||||
this.closed = true;
|
||||
}
|
||||
|
||||
|
|
|
@ -43,16 +43,6 @@ public class NoOffsetForPartitionException extends InvalidOffsetException {
|
|||
this.partitions = Collections.unmodifiableSet(new HashSet<>(partitions));
|
||||
}
|
||||
|
||||
/**
|
||||
* returns the first partition (out of {@link #partitions}) for which no offset is defined.
|
||||
* @deprecated please use {@link #partitions}
|
||||
* @return a partition with no offset
|
||||
*/
|
||||
@Deprecated
|
||||
public TopicPartition partition() {
|
||||
return partitions.isEmpty() ? null : partitions.iterator().next();
|
||||
}
|
||||
|
||||
/**
|
||||
* returns all partitions for which no offests are defined.
|
||||
* @return all partitions without offsets
|
||||
|
|
|
@ -29,7 +29,6 @@ import java.time.Duration;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* The interface for the {@link KafkaProducer}
|
||||
|
@ -100,11 +99,6 @@ public interface Producer<K, V> extends Closeable {
|
|||
*/
|
||||
void close();
|
||||
|
||||
@Deprecated
|
||||
default void close(long timeout, TimeUnit unit) {
|
||||
close(Duration.ofMillis(unit.toMillis(timeout)));
|
||||
}
|
||||
|
||||
/**
|
||||
* See {@link KafkaProducer#close(Duration)}
|
||||
*/
|
||||
|
|
|
@ -490,15 +490,6 @@ public class ProducerConfig extends AbstractConfig {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated Since 2.7.0. This will be removed in a future major release.
|
||||
*/
|
||||
@Deprecated
|
||||
public static Map<String, Object> addSerializerToConfig(Map<String, Object> configs,
|
||||
Serializer<?> keySerializer, Serializer<?> valueSerializer) {
|
||||
return appendSerializerToConfig(configs, keySerializer, valueSerializer);
|
||||
}
|
||||
|
||||
static Map<String, Object> appendSerializerToConfig(Map<String, Object> configs,
|
||||
Serializer<?> keySerializer,
|
||||
Serializer<?> valueSerializer) {
|
||||
|
@ -510,22 +501,6 @@ public class ProducerConfig extends AbstractConfig {
|
|||
return newConfigs;
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated Since 2.7.0. This will be removed in a future major release.
|
||||
*/
|
||||
@Deprecated
|
||||
public static Properties addSerializerToConfig(Properties properties,
|
||||
Serializer<?> keySerializer,
|
||||
Serializer<?> valueSerializer) {
|
||||
Properties newProperties = new Properties();
|
||||
newProperties.putAll(properties);
|
||||
if (keySerializer != null)
|
||||
newProperties.put(KEY_SERIALIZER_CLASS_CONFIG, keySerializer.getClass().getName());
|
||||
if (valueSerializer != null)
|
||||
newProperties.put(VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer.getClass().getName());
|
||||
return newProperties;
|
||||
}
|
||||
|
||||
public ProducerConfig(Properties props) {
|
||||
super(CONFIG, props);
|
||||
}
|
||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.kafka.common;
|
|||
import java.io.Closeable;
|
||||
import java.io.PrintStream;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
|
||||
|
@ -34,33 +33,21 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|||
*/
|
||||
public interface MessageFormatter extends Configurable, Closeable {
|
||||
|
||||
/**
|
||||
* Initialises the MessageFormatter
|
||||
* @param props Properties to configure the formatter
|
||||
* @deprecated Use {@link #configure(Map)} instead, this method is for backward compatibility with the older Formatter interface
|
||||
*/
|
||||
@Deprecated
|
||||
default public void init(Properties props) {}
|
||||
|
||||
/**
|
||||
* Configures the MessageFormatter
|
||||
* @param configs Map to configure the formatter
|
||||
*/
|
||||
default public void configure(Map<String, ?> configs) {
|
||||
Properties properties = new Properties();
|
||||
properties.putAll(configs);
|
||||
init(properties);
|
||||
}
|
||||
default void configure(Map<String, ?> configs) {}
|
||||
|
||||
/**
|
||||
* Parses and formats a record for display
|
||||
* @param consumerRecord the record to format
|
||||
* @param output the print stream used to output the record
|
||||
*/
|
||||
public void writeTo(ConsumerRecord<byte[], byte[]> consumerRecord, PrintStream output);
|
||||
void writeTo(ConsumerRecord<byte[], byte[]> consumerRecord, PrintStream output);
|
||||
|
||||
/**
|
||||
* Closes the formatter
|
||||
*/
|
||||
default public void close() {}
|
||||
default void close() {}
|
||||
}
|
||||
|
|
|
@ -1,79 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.common.serialization;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.kafka.common.header.Headers;
|
||||
|
||||
/**
|
||||
* A Deserializer that has access to the headers associated with the record.
|
||||
*
|
||||
* Prefer {@link Deserializer} if access to the headers is not required. Once Kafka drops support for Java 7, the
|
||||
* {@code deserialize()} method introduced by this interface will be added to Deserializer with a default implementation
|
||||
* so that backwards compatibility is maintained. This interface may be deprecated once that happens.
|
||||
*
|
||||
* A class that implements this interface is expected to have a constructor with no parameters.
|
||||
* @param <T>
|
||||
* @deprecated This class has been deprecated and will be removed in a future release. Please use {@link Deserializer} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
public interface ExtendedDeserializer<T> extends Deserializer<T> {
|
||||
|
||||
/**
|
||||
* Deserialize a record value from a byte array into a value or object.
|
||||
* @param topic topic associated with the data
|
||||
* @param headers headers associated with the record; may be empty.
|
||||
* @param data serialized bytes; may be null; implementations are recommended to handle null by returning a value or null rather than throwing an exception.
|
||||
* @return deserialized typed data; may be null
|
||||
*/
|
||||
T deserialize(String topic, Headers headers, byte[] data);
|
||||
|
||||
class Wrapper<T> implements ExtendedDeserializer<T> {
|
||||
|
||||
private final Deserializer<T> deserializer;
|
||||
|
||||
public Wrapper(Deserializer<T> deserializer) {
|
||||
this.deserializer = deserializer;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public T deserialize(String topic, Headers headers, byte[] data) {
|
||||
return deserialize(topic, data);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(Map<String, ?> configs, boolean isKey) {
|
||||
deserializer.configure(configs, isKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
public T deserialize(String topic, byte[] data) {
|
||||
return deserializer.deserialize(topic, data);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
deserializer.close();
|
||||
}
|
||||
|
||||
public static <T> ExtendedDeserializer<T> ensureExtended(Deserializer<T> deserializer) {
|
||||
return deserializer == null ? null : deserializer instanceof ExtendedDeserializer ? (ExtendedDeserializer<T>) deserializer : new ExtendedDeserializer.Wrapper<>(deserializer);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,79 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.common.serialization;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.kafka.common.header.Headers;
|
||||
|
||||
/**
|
||||
* A Serializer that has access to the headers associated with the record.
|
||||
*
|
||||
* Prefer {@link Serializer} if access to the headers is not required. Once Kafka drops support for Java 7, the
|
||||
* {@code serialize()} method introduced by this interface will be added to Serializer with a default implementation
|
||||
* so that backwards compatibility is maintained. This interface may be deprecated once that happens.
|
||||
*
|
||||
* A class that implements this interface is expected to have a constructor with no parameters.
|
||||
* @param <T>
|
||||
* @deprecated This class has been deprecated and will be removed in a future release. Please use {@link Serializer} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
public interface ExtendedSerializer<T> extends Serializer<T> {
|
||||
|
||||
/**
|
||||
* Convert {@code data} into a byte array.
|
||||
*
|
||||
* @param topic topic associated with data
|
||||
* @param headers headers associated with the record
|
||||
* @param data typed data
|
||||
* @return serialized bytes
|
||||
*/
|
||||
byte[] serialize(String topic, Headers headers, T data);
|
||||
|
||||
class Wrapper<T> implements ExtendedSerializer<T> {
|
||||
|
||||
private final Serializer<T> serializer;
|
||||
|
||||
public Wrapper(Serializer<T> serializer) {
|
||||
this.serializer = serializer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] serialize(String topic, Headers headers, T data) {
|
||||
return serialize(topic, data);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(Map<String, ?> configs, boolean isKey) {
|
||||
serializer.configure(configs, isKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] serialize(String topic, T data) {
|
||||
return serializer.serialize(topic, data);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
serializer.close();
|
||||
}
|
||||
|
||||
public static <T> ExtendedSerializer<T> ensureExtended(Serializer<T> serializer) {
|
||||
return serializer == null ? null : serializer instanceof ExtendedSerializer ? (ExtendedSerializer<T>) serializer : new ExtendedSerializer.Wrapper<>(serializer);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -68,34 +68,6 @@ public class ConsumerConfigTest {
|
|||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Test
|
||||
public void testDeserializerToPropertyConfig() {
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClassName);
|
||||
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClassName);
|
||||
Properties newProperties = ConsumerConfig.addDeserializerToConfig(properties, null, null);
|
||||
assertEquals(newProperties.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG), keyDeserializerClassName);
|
||||
assertEquals(newProperties.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), valueDeserializerClassName);
|
||||
|
||||
properties.clear();
|
||||
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClassName);
|
||||
newProperties = ConsumerConfig.addDeserializerToConfig(properties, keyDeserializer, null);
|
||||
assertEquals(newProperties.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG), keyDeserializerClassName);
|
||||
assertEquals(newProperties.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), valueDeserializerClassName);
|
||||
|
||||
properties.clear();
|
||||
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClassName);
|
||||
newProperties = ConsumerConfig.addDeserializerToConfig(properties, null, valueDeserializer);
|
||||
assertEquals(newProperties.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG), keyDeserializerClassName);
|
||||
assertEquals(newProperties.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), valueDeserializerClassName);
|
||||
|
||||
properties.clear();
|
||||
newProperties = ConsumerConfig.addDeserializerToConfig(properties, keyDeserializer, valueDeserializer);
|
||||
assertEquals(newProperties.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG), keyDeserializerClassName);
|
||||
assertEquals(newProperties.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), valueDeserializerClassName);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAppendDeserializerToConfig() {
|
||||
Map<String, Object> configs = new HashMap<>();
|
||||
|
|
|
@ -142,11 +142,6 @@ import static org.junit.jupiter.api.Assertions.assertNull;
|
|||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.fail;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyLong;
|
||||
import static org.mockito.Mockito.doCallRealMethod;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
public class KafkaConsumerTest {
|
||||
private final String topic = "test";
|
||||
|
@ -2441,15 +2436,6 @@ public class KafkaConsumerTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("deprecation")
|
||||
public void testCloseWithTimeUnit() {
|
||||
KafkaConsumer consumer = mock(KafkaConsumer.class);
|
||||
doCallRealMethod().when(consumer).close(anyLong(), any());
|
||||
consumer.close(1, TimeUnit.SECONDS);
|
||||
verify(consumer).close(Duration.ofSeconds(1));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSubscriptionOnInvalidTopic() {
|
||||
Time time = new MockTime();
|
||||
|
|
|
@ -639,13 +639,6 @@ public class KafkaProducerTest {
|
|||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
@Deprecated
|
||||
public void testHeadersWithExtendedClasses() {
|
||||
doTestHeaders(org.apache.kafka.common.serialization.ExtendedSerializer.class);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void testHeaders() {
|
||||
|
|
|
@ -23,7 +23,6 @@ import org.junit.jupiter.api.Test;
|
|||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
|
@ -31,39 +30,9 @@ public class ProducerConfigTest {
|
|||
|
||||
private final Serializer<byte[]> keySerializer = new ByteArraySerializer();
|
||||
private final Serializer<String> valueSerializer = new StringSerializer();
|
||||
private final String keySerializerClassName = keySerializer.getClass().getName();
|
||||
private final String valueSerializerClassName = valueSerializer.getClass().getName();
|
||||
private final Object keySerializerClass = keySerializer.getClass();
|
||||
private final Object valueSerializerClass = valueSerializer.getClass();
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Test
|
||||
public void testSerializerToPropertyConfig() {
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClassName);
|
||||
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClassName);
|
||||
Properties newProperties = ProducerConfig.addSerializerToConfig(properties, null, null);
|
||||
assertEquals(newProperties.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG), keySerializerClassName);
|
||||
assertEquals(newProperties.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG), valueSerializerClassName);
|
||||
|
||||
properties.clear();
|
||||
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClassName);
|
||||
newProperties = ProducerConfig.addSerializerToConfig(properties, keySerializer, null);
|
||||
assertEquals(newProperties.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG), keySerializerClassName);
|
||||
assertEquals(newProperties.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG), valueSerializerClassName);
|
||||
|
||||
properties.clear();
|
||||
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClassName);
|
||||
newProperties = ProducerConfig.addSerializerToConfig(properties, null, valueSerializer);
|
||||
assertEquals(newProperties.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG), keySerializerClassName);
|
||||
assertEquals(newProperties.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG), valueSerializerClassName);
|
||||
|
||||
properties.clear();
|
||||
newProperties = ProducerConfig.addSerializerToConfig(properties, keySerializer, valueSerializer);
|
||||
assertEquals(newProperties.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG), keySerializerClassName);
|
||||
assertEquals(newProperties.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG), valueSerializerClassName);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAppendSerializerToConfig() {
|
||||
Map<String, Object> configs = new HashMap<>();
|
||||
|
|
|
@ -1,28 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package kafka.common
|
||||
|
||||
/**
|
||||
* Typical implementations of this interface convert a `ConsumerRecord` into a type that can then be passed to
|
||||
* a `PrintStream`.
|
||||
*
|
||||
* This is used by the `ConsoleConsumer`.
|
||||
*/
|
||||
@deprecated("This class is deprecated and will be replaced by org.apache.kafka.common.MessageFormatter.", "2.7.0")
|
||||
trait MessageFormatter extends org.apache.kafka.common.MessageFormatter {
|
||||
}
|
|
@ -137,16 +137,6 @@ class PlaintextConsumerTest extends BaseConsumerTest {
|
|||
assertEquals(Set(tp, tp2), consumer.assignment().asScala)
|
||||
}
|
||||
|
||||
@deprecated("Serializer now includes a default method that provides the headers", since = "2.1")
|
||||
@Test
|
||||
def testHeadersExtendedSerializerDeserializer(): Unit = {
|
||||
val extendedSerializer = new ExtendedSerializer[Array[Byte]] with SerializerImpl
|
||||
|
||||
val extendedDeserializer = new ExtendedDeserializer[Array[Byte]] with DeserializerImpl
|
||||
|
||||
testHeadersSerializeDeserialize(extendedSerializer, extendedDeserializer)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testHeadersSerializerDeserializer(): Unit = {
|
||||
val extendedSerializer = new Serializer[Array[Byte]] with SerializerImpl
|
||||
|
|
|
@ -27,18 +27,31 @@
|
|||
or updating the application not to use internal classes.</li>
|
||||
<li>The Streams API removed all deprecated APIs that were deprecated in version 2.5.0 or earlier.
|
||||
For a complete list of removed APIs compare the detailed Kafka Streams upgrade notes.</li>
|
||||
<li>The deprecated Scala <code>Authorizer</code>, <code>SimpleAclAuthorizer</code> and related classes have been removed. Please use the Java <code>Authorizer</code>
|
||||
and <code>AclAuthorizer</code> instead.</li>
|
||||
<li>The deprecated <code>Metric#value()</code> method was removed (<a href="https://issues.apache.org/jira/browse/KAFKA-12573">KAFKA-12573</a>).</li>
|
||||
<li>Deprecated security classes were removed: <code>PrincipalBuilder</code>, <code>DefaultPrincipalBuilder</code> and <code>ResourceFilter</code>.
|
||||
Furthermore, deprecated constants and constructors were removed from <code>SslConfigs</code>, <code>SaslConfigs</code>,
|
||||
<code>AclBinding</code> and <code>AclBindingFilter</code>.</li>
|
||||
<li>The deprecated <code>Admin.electedPreferredLeaders()</code> methods were removed. Please use <code>Admin.electLeaders</code> instead.</li>
|
||||
<li>The deprecated <code>kafka-preferred-replica-election</code> command line tool was removed. Please use <code>kafka-leader-election</code> instead.</li>
|
||||
<li>The deprecated <code>ConfigEntry</code> constructor was removed (<a href="https://issues.apache.org/jira/browse/KAFKA-12577">KAFKA-12577</a>).
|
||||
Please use the remaining public constructor instead.</li>
|
||||
<li>The deprecated config value <code>default</code> for the client config <code>client.dns.lookup</code> has been removed. In the unlikely
|
||||
event that you set this config explicitly, we recommend leaving the config unset (<code>use_all_dns_ips</code> is used by default).</li>
|
||||
<li>A number of deprecated classes, methods and tools have been removed from the <code>clients</code>, <code>core</code> and <code>tools</code> modules:</li>
|
||||
<ul>
|
||||
<li>The Scala <code>Authorizer</code>, <code>SimpleAclAuthorizer</code> and related classes have been removed. Please use the Java <code>Authorizer</code>
|
||||
and <code>AclAuthorizer</code> instead.</li>
|
||||
<li>The <code>Metric#value()</code> method was removed (<a href="https://issues.apache.org/jira/browse/KAFKA-12573">KAFKA-12573</a>).</li>
|
||||
<li>The <code>PrincipalBuilder</code>, <code>DefaultPrincipalBuilder</code> and <code>ResourceFilter</code> classes were removed.
|
||||
<li>Various constants and constructors were removed from <code>SslConfigs</code>, <code>SaslConfigs</code>, <code>AclBinding</code> and
|
||||
<code>AclBindingFilter</code>.</li>
|
||||
<li>The <code>Admin.electedPreferredLeaders()</code> methods were removed. Please use <code>Admin.electLeaders</code> instead.</li>
|
||||
<li>The <code>kafka-preferred-replica-election</code> command line tool was removed. Please use <code>kafka-leader-election</code> instead.</li>
|
||||
<li>The <code>ConfigEntry</code> constructor was removed (<a href="https://issues.apache.org/jira/browse/KAFKA-12577">KAFKA-12577</a>).
|
||||
Please use the remaining public constructor instead.</li>
|
||||
<li>The config value <code>default</code> for the client config <code>client.dns.lookup</code> has been removed. In the unlikely
|
||||
event that you set this config explicitly, we recommend leaving the config unset (<code>use_all_dns_ips</code> is used by default).</li>
|
||||
<li>The <code>ExtendedDeserializer</code> and <code>ExtendedSerializer</code> classes have been removed. Please use <code>Deserializer</code>
|
||||
and <code>Serializer</code> instead.</li>
|
||||
<li>The <code>close(long, TimeUnit)</code> method was removed from the producer, consumer and admin client. Please use
|
||||
<code>close(Duration)</code>.</li>
|
||||
<li>The <code>ConsumerConfig.addDeserializerToConfig</code> and <code>ProducerConfig.addSerializerToConfig</code> methods
|
||||
were removed. These methods were not intended to be public API and there is no replacement.</li>
|
||||
<li>The <code>NoOffsetForPartitionException.partition()</code> method was removed. Please use <code>partitions()</code>
|
||||
instead.</li>
|
||||
<li>The Scala <code>kafka.common.MessageFormatter</code> was removed. Plese use the Java <code>org.apache.kafka.common.MessageFormatter</code>.</li>
|
||||
<li>The <code>MessageFormatter.init(Properties)</code> method was removed. Please use <code>configure(Map)</code> instead.</li>
|
||||
</ul>
|
||||
</ul>
|
||||
|
||||
<h5><a id="upgrade_280_notable" href="#upgrade_280_notable">Notable changes in 2.8.0</a></h5>
|
||||
|
|
|
@ -530,10 +530,10 @@ public class StreamThread extends Thread {
|
|||
this.numIterations = 1;
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
private static final class InternalConsumerConfig extends ConsumerConfig {
|
||||
private InternalConsumerConfig(final Map<String, Object> props) {
|
||||
super(ConsumerConfig.addDeserializerToConfig(props, new ByteArrayDeserializer(), new ByteArrayDeserializer()), false);
|
||||
super(ConsumerConfig.appendDeserializerToConfig(props, new ByteArrayDeserializer(),
|
||||
new ByteArrayDeserializer()), false);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue