KAFKA-19373 Fix protocol name comparison (#19903)
CI / build (push) Waiting to run Details

Fix to ensure protocol name comparison in integration test ignore case
(group protocol from param is lower case, vs enum name upper case)

The tests were not failing but the custom configs/expectation were not
being applied depending on the protocol (the tests checks for
"groupProtocol.equals(CLASSIC)" would never be true.

Found all comparisons with equals agains the constant name and fixed
them (not too many luckily).

I did consider changing the protocol param that is passed to every test
(that is now lowercase), but still, seems more robust to have the tests
ignore case.

Reviewers: Gaurav Narula <gaurav_narula2@apple.com>, Ken Huang
 <s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, TengYao Chi
 <frankvicky@apache.org>
This commit is contained in:
Lianet Magrans 2025-06-04 23:48:26 -04:00 committed by GitHub
parent 2694d7aad9
commit 7cd99ea66d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 8 additions and 8 deletions

View File

@ -83,7 +83,7 @@ abstract class BaseConsumerTest extends AbstractConsumerTest {
@MethodSource(Array("getTestGroupProtocolParametersAll")) @MethodSource(Array("getTestGroupProtocolParametersAll"))
def testCoordinatorFailover(groupProtocol: String): Unit = { def testCoordinatorFailover(groupProtocol: String): Unit = {
val listener = new TestConsumerReassignmentListener() val listener = new TestConsumerReassignmentListener()
if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) { if (groupProtocol.equalsIgnoreCase(GroupProtocol.CLASSIC.name)) {
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "5001") this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "5001")
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000") this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
} }

View File

@ -307,7 +307,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
val consumer1 = createConsumerAndReceive(group1, manualAssign = false, numRecords) val consumer1 = createConsumerAndReceive(group1, manualAssign = false, numRecords)
val requestTimeout = 6000 val requestTimeout = 6000
if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) { if (groupProtocol.equalsIgnoreCase(GroupProtocol.CLASSIC.name)) {
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "5000") this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "5000")
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000") this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
} }
@ -338,7 +338,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
val partitionCount = consumerCount * 2 val partitionCount = consumerCount * 2
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000") this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000")
if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) { if (groupProtocol.equalsIgnoreCase(GroupProtocol.CLASSIC.name)) {
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000") this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
} }
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
@ -377,7 +377,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
val group = "fatal-exception-test" val group = "fatal-exception-test"
val topic = "fatal-exception-test" val topic = "fatal-exception-test"
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000") this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000")
if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) { if (groupProtocol.equalsIgnoreCase(GroupProtocol.CLASSIC.name)) {
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000") this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
} }
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
@ -418,7 +418,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
val topic = "closetest" val topic = "closetest"
createTopic(topic, 10, brokerCount) createTopic(topic, 10, brokerCount)
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000") this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000")
if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) { if (groupProtocol.equalsIgnoreCase(GroupProtocol.CLASSIC.name)) {
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000") this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
} }
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")

View File

@ -431,7 +431,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
// Verify that records are consumed if all topics are authorized // Verify that records are consumed if all topics are authorized
consumer.subscribe(java.util.List.of(topic)) consumer.subscribe(java.util.List.of(topic))
if (groupProtocol.equals(GroupProtocol.CLASSIC)) { if (groupProtocol.equalsIgnoreCase(GroupProtocol.CLASSIC.name)) {
consumeRecordsIgnoreOneAuthorizationException(consumer) consumeRecordsIgnoreOneAuthorizationException(consumer)
} else { } else {
TestUtils.waitUntilTrue(() => { TestUtils.waitUntilTrue(() => {

View File

@ -360,7 +360,7 @@ class PlaintextConsumerSubscriptionTest extends AbstractConsumerTest {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
@MethodSource(Array("getTestGroupProtocolParametersAll")) @MethodSource(Array("getTestGroupProtocolParametersAll"))
def testUnsubscribeTopic(groupProtocol: String): Unit = { def testUnsubscribeTopic(groupProtocol: String): Unit = {
if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) { if (groupProtocol.equalsIgnoreCase(GroupProtocol.CLASSIC.name)) {
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100") // timeout quickly to avoid slow test this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100") // timeout quickly to avoid slow test
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30") this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30")
} }

View File

@ -386,7 +386,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
@MethodSource(Array("getTestGroupProtocolParametersAll")) @MethodSource(Array("getTestGroupProtocolParametersAll"))
def testPauseStateNotPreservedByRebalance(groupProtocol: String): Unit = { def testPauseStateNotPreservedByRebalance(groupProtocol: String): Unit = {
if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) { if (groupProtocol.equalsIgnoreCase(GroupProtocol.CLASSIC.name)) {
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100") // timeout quickly to avoid slow test this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100") // timeout quickly to avoid slow test
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30") this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30")
} }