KAFKA-17927 Disallow users to configure `max.in.flight.requests.per.connection` bigger than 5 (#17717)

Reviewers: PoAn Yang <payang@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
TengYao Chi 2024-11-18 14:01:16 +08:00 committed by GitHub
parent ebbee397a7
commit e1dcd383bc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 28 additions and 16 deletions

View File

@ -589,13 +589,8 @@ public class ProducerConfig extends AbstractConfig {
final int inFlightConnection = this.getInt(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
if (MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE < inFlightConnection) {
if (userConfiguredIdempotence) {
throw new ConfigException("Must set " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to at most 5" +
" to use the idempotent producer.");
}
log.warn("Idempotence will be disabled because {} is set to {}, which is greater than 5. " +
"Please note that in v4.0.0 and onward, this will become an error.", MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, inFlightConnection);
shouldDisableIdempotence = true;
throw new ConfigException("To use the idempotent producer, " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION +
" must be set to at most 5. Current value is " + inFlightConnection + ".");
}
}

View File

@ -416,15 +416,10 @@ public class KafkaProducerTest {
putAll(baseProps);
setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "6");
}};
config = new ProducerConfig(validProps2);
assertFalse(
config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG),
"idempotence should be disabled when `max.in.flight.requests.per.connection` is greater than 5 and " +
"`enable.idempotence` config is unset.");
assertEquals(
6,
config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
"`max.in.flight.requests.per.connection` should be set with overridden value");
ConfigException configException = assertThrows(ConfigException.class, () -> new ProducerConfig(validProps2));
assertEquals("To use the idempotent producer, " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION +
" must be set to at most 5. Current value is 6.", configException.getMessage());
Properties invalidProps = new Properties() {{
putAll(baseProps);

View File

@ -30,6 +30,7 @@ import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -129,4 +130,19 @@ public class ProducerConfigTest {
final ProducerConfig producerConfig = new ProducerConfig(configs);
assertEquals(saslSslLowerCase, producerConfig.originals().get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
}
@Test
void testUpperboundCheckOfEnableIdempotence() {
String inFlightConnection = "6";
final Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass);
configs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, inFlightConnection);
ConfigException configException = assertThrows(ConfigException.class, () -> new ProducerConfig(configs));
assertEquals("To use the idempotent producer, " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION +
" must be set to at most 5. Current value is " + inFlightConnection + ".", configException.getMessage());
configs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
assertDoesNotThrow(() -> new ProducerConfig(configs));
}
}

View File

@ -133,6 +133,12 @@
</li>
</ul>
</li>
<li><b>Producer</b>
<ul>
<li>The <code>enable.idempotence</code> configuration will no longer automatically fall back when the <code>max.in.flight.requests.per.connection</code> value exceeds 5.
</li>
</ul>
</li>
</ul>
</li>
<li>Other changes: