KAFKA-3621; Add tests for ApiVersionRequest/Response

Author: Ashish Singh <asingh@cloudera.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #1275 from SinghAsDev/KAFKA-3621
This commit is contained in:
Ashish Singh 2016-04-27 17:31:31 -07:00 committed by Ismael Juma
parent 669be7fadc
commit 57831a56a5
2 changed files with 55 additions and 3 deletions

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.ApiVersionsResponse.ApiVersion
import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse}
import org.junit.Assert._
import org.junit.Test
import scala.collection.JavaConversions._
class ApiVersionsRequestTest extends BaseRequestTest {
override def numBrokers: Int = 1
@Test
def testApiVersionsRequest() {
val apiVersionsResponse = sendApiVersionsRequest(new ApiVersionsRequest, 0)
assertEquals("API keys in ApiVersionsResponse must match API keys supported by broker.", ApiKeys.values.length, apiVersionsResponse.apiVersions.size)
for (expectedApiVersion: ApiVersion <- KafkaApis.apiVersionsResponse.apiVersions) {
val actualApiVersion = apiVersionsResponse.apiVersion(expectedApiVersion.apiKey)
assertNotNull(s"API key ${actualApiVersion.apiKey} is supported by broker, but not received in ApiVersionsResponse.", actualApiVersion)
assertEquals("API key must be supported by the broker.", expectedApiVersion.apiKey, actualApiVersion.apiKey)
assertEquals(s"Received unexpected min version for API key ${actualApiVersion.apiKey}.", expectedApiVersion.minVersion, actualApiVersion.minVersion)
assertEquals(s"Received unexpected max version for API key ${actualApiVersion.apiKey}.", expectedApiVersion.maxVersion, actualApiVersion.maxVersion)
}
}
private def sendApiVersionsRequest(request: ApiVersionsRequest, version: Short): ApiVersionsResponse = {
val response = send(request, ApiKeys.API_VERSIONS, version)
ApiVersionsResponse.parse(response)
}
}

View File

@ -30,11 +30,13 @@ import org.apache.kafka.common.requests.{AbstractRequest, RequestHeader, Respons
import org.junit.Before
abstract class BaseRequestTest extends KafkaServerTestHarness {
val numBrokers = 3
private var correlationId = 0
// Override properties by mutating the passed Properties object
def propertyOverrides(properties: Properties): Unit
// If required, set number of brokers
protected def numBrokers: Int = 3
// If required, override properties by mutating the passed Properties object
protected def propertyOverrides(properties: Properties) {}
def generateConfigs() = {
val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect, enableControlledShutdown = false)