mirror of https://github.com/apache/kafka.git
KAFKA-17707 Remove zk from BaseConsumerTest (#17383)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
672c617233
commit
d0653378bf
|
|
@ -112,29 +112,19 @@ abstract class BaseConsumerTest extends AbstractConsumerTest {
|
|||
|
||||
object BaseConsumerTest {
|
||||
// We want to test the following combinations:
|
||||
// * ZooKeeper and the classic group protocol
|
||||
// * KRaft and the classic group protocol
|
||||
// * KRaft and the consumer group protocol
|
||||
def getTestQuorumAndGroupProtocolParametersAll() : java.util.stream.Stream[Arguments] = {
|
||||
stream.Stream.of(
|
||||
Arguments.of("zk", "classic"),
|
||||
Arguments.of("kraft", "classic"),
|
||||
Arguments.of("kraft", "consumer")
|
||||
)
|
||||
}
|
||||
|
||||
def getTestQuorumAndGroupProtocolParametersZkOnly() : java.util.stream.Stream[Arguments] = {
|
||||
stream.Stream.of(
|
||||
Arguments.of("zk", "classic")
|
||||
)
|
||||
}
|
||||
|
||||
// For tests that only work with the classic group protocol, we want to test the following combinations:
|
||||
// * ZooKeeper and the classic group protocol
|
||||
// * KRaft and the classic group protocol
|
||||
def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly() : java.util.stream.Stream[Arguments] = {
|
||||
stream.Stream.of(
|
||||
Arguments.of("zk", "classic"),
|
||||
Arguments.of("kraft", "classic")
|
||||
)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,10 +21,8 @@ import org.junit.jupiter.api.Timeout
|
|||
import org.junit.jupiter.params.ParameterizedTest
|
||||
import org.junit.jupiter.params.provider.{Arguments, MethodSource}
|
||||
|
||||
import org.apache.kafka.common.PartitionInfo
|
||||
import java.util.stream.Stream
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.collection.mutable
|
||||
|
||||
/**
|
||||
* Integration tests for the consumer that covers logic related to manual assignment.
|
||||
|
|
@ -134,34 +132,6 @@ class PlaintextConsumerAssignTest extends AbstractConsumerTest {
|
|||
assertEquals(numRecords, consumer.position(tp))
|
||||
}
|
||||
|
||||
// partitionsFor not implemented in consumer group protocol and this test requires ZK also
|
||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
|
||||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersZkOnly"))
|
||||
def testAssignAndConsumeWithLeaderChangeValidatingPositions(quorum: String, groupProtocol: String): Unit = {
|
||||
val numRecords = 10
|
||||
val producer = createProducer()
|
||||
val startingTimestamp = System.currentTimeMillis()
|
||||
sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp)
|
||||
val props = new Properties()
|
||||
val consumer = createConsumer(configOverrides = props,
|
||||
configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG))
|
||||
consumer.assign(List(tp).asJava)
|
||||
consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 0, startingTimestamp = startingTimestamp)
|
||||
|
||||
// Force leader epoch change to trigger position validation
|
||||
var parts: mutable.Buffer[PartitionInfo] = null
|
||||
while (parts == null)
|
||||
parts = consumer.partitionsFor(tp.topic()).asScala
|
||||
val leader = parts.head.leader().id()
|
||||
this.servers(leader).shutdown()
|
||||
this.servers(leader).startup()
|
||||
|
||||
// Consume after leader change
|
||||
sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp)
|
||||
consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 10,
|
||||
startingTimestamp = startingTimestamp)
|
||||
}
|
||||
|
||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
|
||||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
|
||||
def testAssignAndFetchCommittedOffsets(quorum: String, groupProtocol: String): Unit = {
|
||||
|
|
@ -240,7 +210,4 @@ class PlaintextConsumerAssignTest extends AbstractConsumerTest {
|
|||
object PlaintextConsumerAssignTest {
|
||||
def getTestQuorumAndGroupProtocolParametersAll: Stream[Arguments] =
|
||||
BaseConsumerTest.getTestQuorumAndGroupProtocolParametersAll()
|
||||
|
||||
def getTestQuorumAndGroupProtocolParametersZkOnly: Stream[Arguments] =
|
||||
BaseConsumerTest.getTestQuorumAndGroupProtocolParametersZkOnly()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -47,7 +47,6 @@ import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_
|
|||
/**
|
||||
* The old test framework {@link kafka.api.BaseConsumerTest#getTestQuorumAndGroupProtocolParametersAll} test for the following cases:
|
||||
* <ul>
|
||||
* <li>(KRAFT servers) with (group.coordinator.new.enable=false) with (classic group protocol) = 1 cases</li>
|
||||
* <li>(KRAFT server) with (group.coordinator.new.enable=true) with (classic group protocol) = 1 case</li>
|
||||
* <li>(KRAFT server) with (group.coordinator.new.enable=true) with (consumer group protocol) = 1 case</li>
|
||||
* </ul>
|
||||
|
|
@ -64,7 +63,7 @@ import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_
|
|||
* <li>(CO_KRAFT servers) with (group.coordinator.new.enable=true) with (classic / consumer group protocols) = 2 cases</li>
|
||||
* </ul>
|
||||
* <ul>
|
||||
* <li>(KRAFT servers) with (group.coordinator.new.enable=false) with (classic group protocol) = 1 cases</li>
|
||||
* <li>(KRAFT server) with (group.coordinator.new.enable=false) with (classic group protocol) = 1 case</li>
|
||||
* </ul>
|
||||
*/
|
||||
class ConsumerGroupCommandTestUtils {
|
||||
|
|
|
|||
Loading…
Reference in New Issue