mirror of https://github.com/apache/kafka.git
MINOR: Tweak implementation of `FetchRequest.shuffle` and upgrade.html improvements
Author: Ismael Juma <ismael@juma.me.uk> Reviewers: Jason Gustafson <jason@confluent.io> Closes #1955 from ijuma/kip-74-follow-up
This commit is contained in:
parent
7115c66aef
commit
cf0bf7c7a2
|
@ -65,8 +65,14 @@ object FetchRequest {
|
|||
FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, minBytes, maxBytes, Vector(pairs:_*))
|
||||
}
|
||||
|
||||
def shuffle(requestInfo: Seq[(TopicAndPartition, PartitionFetchInfo)]): Seq[(TopicAndPartition, PartitionFetchInfo)] =
|
||||
random.shuffle(requestInfo)
|
||||
def shuffle(requestInfo: Seq[(TopicAndPartition, PartitionFetchInfo)]): Seq[(TopicAndPartition, PartitionFetchInfo)] = {
|
||||
val groupedByTopic = requestInfo.groupBy { case (tp, _) => tp.topic }.map { case (topic, values) =>
|
||||
topic -> random.shuffle(values)
|
||||
}
|
||||
random.shuffle(groupedByTopic.toSeq).flatMap { case (topic, partitions) =>
|
||||
partitions.map { case (tp, fetchInfo) => tp -> fetchInfo }
|
||||
}
|
||||
}
|
||||
|
||||
def batchByTopic[T](s: Seq[(TopicAndPartition, T)]): Seq[(String, Seq[(Int, T)])] = {
|
||||
val result = new ArrayBuffer[(String, ArrayBuffer[(Int, T)])]
|
||||
|
|
|
@ -0,0 +1,63 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package kafka.api
|
||||
|
||||
import kafka.common.TopicAndPartition
|
||||
import org.junit.Assert.{assertEquals, assertNotEquals}
|
||||
import org.junit.Test
|
||||
|
||||
class FetchRequestTest {
|
||||
|
||||
@Test
|
||||
def testShuffle() {
|
||||
val seq = (0 to 100).map { i =>
|
||||
val topic = s"topic${i % 10}"
|
||||
(TopicAndPartition(topic, i / 10), PartitionFetchInfo(i, 50))
|
||||
}
|
||||
val shuffled = FetchRequest.shuffle(seq)
|
||||
assertEquals(seq.size, shuffled.size)
|
||||
assertNotEquals(seq, shuffled)
|
||||
|
||||
seq.foreach { case (tp1, fetchInfo1) =>
|
||||
shuffled.foreach { case (tp2, fetchInfo2) =>
|
||||
if (tp1 == tp2)
|
||||
assertEquals(fetchInfo1, fetchInfo2)
|
||||
}
|
||||
}
|
||||
|
||||
val topics = seq.map { case (TopicAndPartition(t, _), _) => t }.distinct
|
||||
topics.foreach { topic =>
|
||||
val startIndex = shuffled.indexWhere { case (tp, _) => tp.topic == topic }
|
||||
val endIndex = shuffled.lastIndexWhere { case (tp, _) => tp.topic == topic }
|
||||
// all partitions for a given topic should appear in sequence
|
||||
assertEquals(Set(topic), shuffled.slice(startIndex, endIndex + 1).map { case (tp, _) => tp.topic }.toSet)
|
||||
}
|
||||
|
||||
val shuffled2 = FetchRequest.shuffle(seq)
|
||||
assertNotEquals(shuffled, shuffled2)
|
||||
assertNotEquals(seq, shuffled2)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testShuffleWithSingleTopic() {
|
||||
val seq = (0 to 50).map(i => (TopicAndPartition("topic", i), PartitionFetchInfo(i, 70)))
|
||||
val shuffled = FetchRequest.shuffle(seq)
|
||||
assertEquals(seq.size, shuffled.size)
|
||||
assertNotEquals(seq, shuffled)
|
||||
}
|
||||
|
||||
}
|
|
@ -30,12 +30,12 @@ Note: Because new protocols are introduced, it is important to upgrade your Kafk
|
|||
<ol>
|
||||
<li> Update server.properties file on all brokers and add the following properties:
|
||||
<ul>
|
||||
<li>inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 0.8.2 or 0.9.0.0).</li>
|
||||
<li>inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 0.8.2, 0.9.0.0 or 0.10.0.0).</li>
|
||||
<li>log.message.format.version=CURRENT_KAFKA_VERSION (See <a href="#upgrade_10_performance_impact">potential performance impact following the upgrade</a> for the details on what this configuration does.)
|
||||
</ul>
|
||||
</li>
|
||||
<li> Upgrade the brokers. This can be done a broker at a time by simply bringing it down, updating the code, and restarting it. </li>
|
||||
<li> Once the entire cluster is upgraded, bump the protocol version by editing inter.broker.protocol.version and setting it to 0.10.1.0. NOTE: If your previous message format version is before 0.10.0, you shouldn't touch log.message.format.version yet - this parameter should only change once all consumers have been upgraded to on or above 0.10.0.0 </li>
|
||||
<li> Once the entire cluster is upgraded, bump the protocol version by editing inter.broker.protocol.version and setting it to 0.10.1.0. NOTE: If your previous message format version is before 0.10.0, you shouldn't touch log.message.format.version yet - this parameter should only change once all consumers have been upgraded to 0.10.0.0 or later.</li>
|
||||
<li> Restart the brokers one by one for the new protocol version to take effect. </li>
|
||||
<li> Once all consumers have been upgraded to 0.10.0, change log.message.format.version to 0.10.1 on each broker and restart them one by one.
|
||||
</li>
|
||||
|
@ -69,12 +69,22 @@ Note: Because new protocols are introduced, it is important to upgrade your Kafk
|
|||
error code will be returned. This may cause unexpected timeouts or delays when using the producer and consumer since
|
||||
Kafka clients will typically retry automatically on unknown topic errors. You should consult the client logs if you
|
||||
suspect this could be happening.</li>
|
||||
<li> Fetch responses have a size limit by default (50 MB for consumers and 10 MB for replication). The existing per partition limits also apply (1 MB for consumers
|
||||
and replication). Note that neither of these limits is an absolute maximum as explained in the next point. </li>
|
||||
<li> Consumers and replicas can make progress if a message larger than the response/partition size limit is found. More concretely, if the first message in the
|
||||
first non-empty partition of the fetch is larger than either or both limits, the message will still be returned. </li>
|
||||
<li> Overloaded constructors were added to <code>kafka.api.FetchRequest</code> and <code>kafka.javaapi.FetchRequest</code> to allow the caller to specify the
|
||||
order of the partitions (since order is significant in v3). The previously existing constructors were deprecated and the partitions are shuffled before
|
||||
the request is sent to avoid starvation issues. </li>
|
||||
</ul>
|
||||
|
||||
<h5><a id="upgrade_1010_new_protocols" href="#upgrade_1010_new_protocols">New Protocol Versions</a></h5>
|
||||
<ul>
|
||||
<li> ListOffsetRequest v1 is introduced and used by default to support accurate offset search based on timestamp.
|
||||
<li> MetadataRequest/Response v2 has been introduced. v2 adds a new field "cluster_id" to MetadataResponse.
|
||||
<li> ListOffsetRequest v1 supports accurate offset search based on timestamps. </li>
|
||||
<li> MetadataResponse v2 introduces a new field: "cluster_id". </li>
|
||||
<li> FetchRequest v3 supports limiting the response size (in addition to the existing per partition limit), it returns messages
|
||||
bigger than the limits if required to make progress and the order of partitions in the request is now significant. </li>
|
||||
<li> JoinGroup v1 introduces a new field: "rebalance_timeout". </li>
|
||||
</ul>
|
||||
|
||||
<h4><a id="upgrade_10" href="#upgrade_10">Upgrading from 0.8.x or 0.9.x to 0.10.0.0</a></h4>
|
||||
|
|
Loading…
Reference in New Issue