mirror of https://github.com/apache/kafka.git
MINOR: remove unused method in IntegrationTestUtils (#17469)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
582bb48e88
commit
aa6460dbed
|
|
@ -17,20 +17,15 @@
|
||||||
|
|
||||||
package kafka.server
|
package kafka.server
|
||||||
|
|
||||||
|
import kafka.network.SocketServer
|
||||||
|
import org.apache.kafka.common.network.ListenerName
|
||||||
|
import org.apache.kafka.common.protocol.ApiKeys
|
||||||
|
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, RequestHeader, ResponseHeader}
|
||||||
|
import org.apache.kafka.common.utils.Utils
|
||||||
|
|
||||||
import java.io.{DataInputStream, DataOutputStream}
|
import java.io.{DataInputStream, DataOutputStream}
|
||||||
import java.net.Socket
|
import java.net.Socket
|
||||||
import java.nio.ByteBuffer
|
import java.nio.ByteBuffer
|
||||||
import java.util.{Collections, Optional, Properties}
|
|
||||||
import kafka.network.SocketServer
|
|
||||||
import kafka.security.JaasTestUtils
|
|
||||||
import org.apache.kafka.clients.admin.{Admin, NewTopic}
|
|
||||||
import org.apache.kafka.common.network.{ConnectionMode, ListenerName}
|
|
||||||
import org.apache.kafka.common.protocol.ApiKeys
|
|
||||||
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, RequestHeader, ResponseHeader}
|
|
||||||
import org.apache.kafka.common.security.auth.SecurityProtocol
|
|
||||||
import org.apache.kafka.common.utils.Utils
|
|
||||||
|
|
||||||
import scala.jdk.CollectionConverters._
|
|
||||||
import scala.reflect.ClassTag
|
import scala.reflect.ClassTag
|
||||||
|
|
||||||
object IntegrationTestUtils {
|
object IntegrationTestUtils {
|
||||||
|
|
@ -102,42 +97,10 @@ object IntegrationTestUtils {
|
||||||
finally socket.close()
|
finally socket.close()
|
||||||
}
|
}
|
||||||
|
|
||||||
def createTopic(
|
|
||||||
admin: Admin,
|
|
||||||
topic: String,
|
|
||||||
numPartitions: Int,
|
|
||||||
replicationFactor: Short
|
|
||||||
): Unit = {
|
|
||||||
val newTopics = Collections.singletonList(new NewTopic(topic, numPartitions, replicationFactor))
|
|
||||||
val createTopicResult = admin.createTopics(newTopics)
|
|
||||||
createTopicResult.all().get()
|
|
||||||
}
|
|
||||||
|
|
||||||
def createTopic(
|
|
||||||
admin: Admin,
|
|
||||||
topic: String,
|
|
||||||
replicaAssignment: Map[Int, Seq[Int]]
|
|
||||||
): Unit = {
|
|
||||||
val javaAssignment = new java.util.HashMap[Integer, java.util.List[Integer]]()
|
|
||||||
replicaAssignment.foreachEntry { (partitionId, assignment) =>
|
|
||||||
javaAssignment.put(partitionId, assignment.map(Int.box).asJava)
|
|
||||||
}
|
|
||||||
val newTopic = new NewTopic(topic, javaAssignment)
|
|
||||||
val newTopics = Collections.singletonList(newTopic)
|
|
||||||
val createTopicResult = admin.createTopics(newTopics)
|
|
||||||
createTopicResult.all().get()
|
|
||||||
}
|
|
||||||
|
|
||||||
protected def securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT
|
|
||||||
private var correlationId = 0
|
private var correlationId = 0
|
||||||
|
|
||||||
def connect(socketServer: SocketServer,
|
def connect(socketServer: SocketServer,
|
||||||
listenerName: ListenerName): Socket = {
|
listenerName: ListenerName): Socket = {
|
||||||
new Socket("localhost", socketServer.boundPort(listenerName))
|
new Socket("localhost", socketServer.boundPort(listenerName))
|
||||||
}
|
}
|
||||||
|
|
||||||
def clientSecurityProps(certAlias: String): Properties = {
|
|
||||||
JaasTestUtils.securityConfigs(ConnectionMode.CLIENT, securityProtocol, Optional.empty(), certAlias,
|
|
||||||
JaasTestUtils.SSL_CERTIFICATE_CN, Optional.empty()) // TODO use real trust store and client SASL properties
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue