mirror of https://github.com/apache/kafka.git
KAFKA-14848: KafkaConsumer incorrectly passes locally-scoped serializers to FetchConfig (#13452)
Fix for a NPE bug that was caused by referring to a local variable and not the instance variable of the deserializers. Co-authored-by: Robert Yokota <1761488+rayokota@users.noreply.github.com> Reviewers: Robert Yokota <1761488+rayokota@users.noreply.github.com>, Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
parent
7438f100cf
commit
31440b00f3
|
@ -799,7 +799,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
|
|||
config.getBoolean(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED),
|
||||
config.getString(ConsumerConfig.CLIENT_RACK_CONFIG));
|
||||
}
|
||||
FetchConfig<K, V> fetchConfig = new FetchConfig<>(config, keyDeserializer, valueDeserializer, isolationLevel);
|
||||
FetchConfig<K, V> fetchConfig = new FetchConfig<>(config,
|
||||
this.keyDeserializer,
|
||||
this.valueDeserializer,
|
||||
isolationLevel);
|
||||
this.fetcher = new Fetcher<>(
|
||||
logContext,
|
||||
this.client,
|
||||
|
|
|
@ -21,6 +21,8 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
|
|||
import org.apache.kafka.common.IsolationLevel;
|
||||
import org.apache.kafka.common.serialization.Deserializer;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* {@link FetchConfig} represents the static configuration for fetching records from Kafka. It is simply a way
|
||||
* to bundle the immutable settings that were presented at the time the {@link Consumer} was created for later use by
|
||||
|
@ -85,8 +87,8 @@ public class FetchConfig<K, V> {
|
|||
this.maxPollRecords = maxPollRecords;
|
||||
this.checkCrcs = checkCrcs;
|
||||
this.clientRackId = clientRackId;
|
||||
this.keyDeserializer = keyDeserializer;
|
||||
this.valueDeserializer = valueDeserializer;
|
||||
this.keyDeserializer = Objects.requireNonNull(keyDeserializer, "Message key deserializer provided to FetchConfig should not be null");
|
||||
this.valueDeserializer = Objects.requireNonNull(valueDeserializer, "Message value deserializer provided to FetchConfig should not be null");
|
||||
this.isolationLevel = isolationLevel;
|
||||
}
|
||||
|
||||
|
@ -101,8 +103,8 @@ public class FetchConfig<K, V> {
|
|||
this.maxPollRecords = config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG);
|
||||
this.checkCrcs = config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG);
|
||||
this.clientRackId = config.getString(ConsumerConfig.CLIENT_RACK_CONFIG);
|
||||
this.keyDeserializer = keyDeserializer;
|
||||
this.valueDeserializer = valueDeserializer;
|
||||
this.keyDeserializer = Objects.requireNonNull(keyDeserializer, "Message key deserializer provided to FetchConfig should not be null");
|
||||
this.valueDeserializer = Objects.requireNonNull(valueDeserializer, "Message value deserializer provided to FetchConfig should not be null");
|
||||
this.isolationLevel = isolationLevel;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,92 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.clients.consumer.internals;
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.common.IsolationLevel;
|
||||
import org.apache.kafka.common.serialization.Deserializer;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
public class FetchConfigTest {
|
||||
|
||||
/**
|
||||
* Verify correctness if both the key and value {@link Deserializer deserializers} provided to the
|
||||
* {@link FetchConfig} constructors are {@code nonnull}.
|
||||
*/
|
||||
@Test
|
||||
public void testBasicFromConsumerConfig() {
|
||||
try (StringDeserializer keyDeserializer = new StringDeserializer(); StringDeserializer valueDeserializer = new StringDeserializer()) {
|
||||
newFetchConfigFromConsumerConfig(keyDeserializer, valueDeserializer);
|
||||
newFetchConfigFromValues(keyDeserializer, valueDeserializer);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify an exception is thrown if the key {@link Deserializer deserializer} provided to the
|
||||
* {@link FetchConfig} constructors is {@code null}.
|
||||
*/
|
||||
@Test
|
||||
public void testPreventNullKeyDeserializer() {
|
||||
try (StringDeserializer valueDeserializer = new StringDeserializer()) {
|
||||
assertThrows(NullPointerException.class, () -> newFetchConfigFromConsumerConfig(null, valueDeserializer));
|
||||
assertThrows(NullPointerException.class, () -> newFetchConfigFromValues(null, valueDeserializer));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify an exception is thrown if the value {@link Deserializer deserializer} provided to the
|
||||
* {@link FetchConfig} constructors is {@code null}.
|
||||
*/
|
||||
@Test
|
||||
@SuppressWarnings("resources")
|
||||
public void testPreventNullValueDeserializer() {
|
||||
try (StringDeserializer keyDeserializer = new StringDeserializer()) {
|
||||
assertThrows(NullPointerException.class, () -> newFetchConfigFromConsumerConfig(keyDeserializer, null));
|
||||
assertThrows(NullPointerException.class, () -> newFetchConfigFromValues(keyDeserializer, null));
|
||||
}
|
||||
}
|
||||
|
||||
private void newFetchConfigFromConsumerConfig(Deserializer<String> keyDeserializer,
|
||||
Deserializer<String> valueDeserializer) {
|
||||
Properties p = new Properties();
|
||||
p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
||||
p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
|
||||
p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
|
||||
ConsumerConfig config = new ConsumerConfig(p);
|
||||
new FetchConfig<>(config, keyDeserializer, valueDeserializer, IsolationLevel.READ_UNCOMMITTED);
|
||||
}
|
||||
|
||||
private void newFetchConfigFromValues(Deserializer<String> keyDeserializer,
|
||||
Deserializer<String> valueDeserializer) {
|
||||
new FetchConfig<>(ConsumerConfig.DEFAULT_FETCH_MIN_BYTES,
|
||||
ConsumerConfig.DEFAULT_FETCH_MAX_BYTES,
|
||||
ConsumerConfig.DEFAULT_FETCH_MAX_WAIT_MS,
|
||||
ConsumerConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES,
|
||||
ConsumerConfig.DEFAULT_MAX_POLL_RECORDS,
|
||||
true,
|
||||
ConsumerConfig.DEFAULT_CLIENT_RACK,
|
||||
keyDeserializer,
|
||||
valueDeserializer,
|
||||
IsolationLevel.READ_UNCOMMITTED);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue