Merged in upstream trunk.

This commit is contained in:
Geoff Anderson 2015-07-27 13:41:34 -07:00
commit a70f0f8ce1
9 changed files with 164 additions and 71 deletions

11
CONTRIBUTING.md Normal file
View File

@ -0,0 +1,11 @@
## Contributing to Kafka
*Before opening a pull request*, review the [Contributing](http://kafka.apache.org/contributing.html) and [Contributing Code Changes](https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes) pages.
It lists steps that are required before creating a PR.
When you contribute code, you affirm that the contribution is your original work and that you
license the work to the project under the project's open source license. Whether or not you
state this explicitly, by submitting any copyrighted material via pull request, email, or
other means you agree to license the material under the project's open source license and
warrant that you have the legal authority to do so.

View File

@ -54,11 +54,11 @@ The release file can be found inside ./core/build/distributions/.
### Cleaning the build ###
./gradlew clean
### Running a task on a particular version of Scala (either 2.9.1, 2.9.2, 2.10.5 or 2.11.7) ###
### Running a task on a particular version of Scala (either 2.10.5 or 2.11.7) ###
#### (If building a jar with a version other than 2.10, need to set SCALA_BINARY_VERSION variable or change it in bin/kafka-run-class.sh to run quick start.) ####
./gradlew -PscalaVersion=2.9.1 jar
./gradlew -PscalaVersion=2.9.1 test
./gradlew -PscalaVersion=2.9.1 releaseTarGz
./gradlew -PscalaVersion=2.11.7 jar
./gradlew -PscalaVersion=2.11.7 test
./gradlew -PscalaVersion=2.11.7 releaseTarGz
### Running a task for a specific project ###
This is for 'core', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'examples' and 'clients'

View File

@ -60,6 +60,7 @@ rat {
excludes.addAll([
'**/.git/**',
'build/**',
'CONTRIBUTING.md',
'gradlew',
'gradlew.bat',
'**/README.md',
@ -163,7 +164,7 @@ subprojects {
}
}
for ( sv in ['2_9_1', '2_9_2', '2_10_5', '2_11_7'] ) {
for ( sv in ['2_10_5', '2_11_7'] ) {
String svInDot = sv.replaceAll( "_", ".")
tasks.create(name: "jar_core_${sv}", type: GradleBuild) {
@ -203,20 +204,20 @@ for ( sv in ['2_9_1', '2_9_2', '2_10_5', '2_11_7'] ) {
}
}
tasks.create(name: "jarAll", dependsOn: ['jar_core_2_9_1', 'jar_core_2_9_2', 'jar_core_2_10_5', 'jar_core_2_11_7', 'clients:jar', 'examples:jar', 'contrib:hadoop-consumer:jar', 'contrib:hadoop-producer:jar', 'log4j-appender:jar', 'tools:jar']) {
tasks.create(name: "jarAll", dependsOn: ['jar_core_2_10_5', 'jar_core_2_11_7', 'clients:jar', 'examples:jar', 'contrib:hadoop-consumer:jar', 'contrib:hadoop-producer:jar', 'log4j-appender:jar', 'tools:jar']) {
}
tasks.create(name: "srcJarAll", dependsOn: ['srcJar_2_9_1', 'srcJar_2_9_2', 'srcJar_2_10_5', 'srcJar_2_11_7', 'clients:srcJar', 'examples:srcJar', 'contrib:hadoop-consumer:srcJar', 'contrib:hadoop-producer:srcJar', 'log4j-appender:srcJar', 'tools:srcJar']) { }
tasks.create(name: "srcJarAll", dependsOn: ['srcJar_2_10_5', 'srcJar_2_11_7', 'clients:srcJar', 'examples:srcJar', 'contrib:hadoop-consumer:srcJar', 'contrib:hadoop-producer:srcJar', 'log4j-appender:srcJar', 'tools:srcJar']) { }
tasks.create(name: "docsJarAll", dependsOn: ['docsJar_2_9_1', 'docsJar_2_9_2', 'docsJar_2_10_5', 'docsJar_2_11_7', 'clients:docsJar', 'examples:docsJar', 'contrib:hadoop-consumer:docsJar', 'contrib:hadoop-producer:docsJar', 'log4j-appender:docsJar', 'tools:docsJar']) { }
tasks.create(name: "docsJarAll", dependsOn: ['docsJar_2_10_5', 'docsJar_2_11_7', 'clients:docsJar', 'examples:docsJar', 'contrib:hadoop-consumer:docsJar', 'contrib:hadoop-producer:docsJar', 'log4j-appender:docsJar', 'tools:docsJar']) { }
tasks.create(name: "testAll", dependsOn: ['test_core_2_9_1', 'test_core_2_9_2', 'test_core_2_10_5', 'test_core_2_11_7', 'clients:test', 'log4j-appender:test', 'tools:test']) {
tasks.create(name: "testAll", dependsOn: ['test_core_2_10_5', 'test_core_2_11_7', 'clients:test', 'log4j-appender:test', 'tools:test']) {
}
tasks.create(name: "releaseTarGzAll", dependsOn: ['releaseTarGz_2_9_1', 'releaseTarGz_2_9_2', 'releaseTarGz_2_10_5', 'releaseTarGz_2_11_7']) {
tasks.create(name: "releaseTarGzAll", dependsOn: ['releaseTarGz_2_10_5', 'releaseTarGz_2_11_7']) {
}
tasks.create(name: "uploadArchivesAll", dependsOn: ['uploadCoreArchives_2_9_1', 'uploadCoreArchives_2_9_2', 'uploadCoreArchives_2_10_5', 'uploadCoreArchives_2_11_7', 'clients:uploadArchives', 'examples:uploadArchives', 'contrib:hadoop-consumer:uploadArchives', 'contrib:hadoop-producer:uploadArchives', 'log4j-appender:uploadArchives', 'tools:uploadArchives']) {
tasks.create(name: "uploadArchivesAll", dependsOn: ['uploadCoreArchives_2_10_5', 'uploadCoreArchives_2_11_7', 'clients:uploadArchives', 'examples:uploadArchives', 'contrib:hadoop-consumer:uploadArchives', 'contrib:hadoop-producer:uploadArchives', 'log4j-appender:uploadArchives', 'tools:uploadArchives']) {
}
project(':core') {
@ -241,10 +242,7 @@ project(':core') {
testCompile 'junit:junit:4.6'
testCompile 'org.easymock:easymock:3.0'
testCompile 'org.objenesis:objenesis:1.2'
if (scalaVersion.startsWith('2.9'))
testCompile "org.scalatest:scalatest_$scalaVersion:1.9.1"
else
testCompile "org.scalatest:scalatest_$baseScalaVersion:2.2.5"
testCompile "org.scalatest:scalatest_$baseScalaVersion:2.2.5"
testRuntime "$slf4jlog4j"

View File

@ -219,7 +219,7 @@ public class Fetcher<K, V> {
for (PartitionRecords<K, V> part : this.records) {
Long consumed = subscriptions.consumed(part.partition);
if (this.subscriptions.assignedPartitions().contains(part.partition)
&& (consumed == null || part.fetchOffset == consumed)) {
&& consumed != null && part.fetchOffset == consumed) {
List<ConsumerRecord<K, V>> records = drained.get(part.partition);
if (records == null) {
records = part.records;
@ -364,6 +364,20 @@ public class Fetcher<K, V> {
parsed.add(parseRecord(tp, logEntry));
bytes += logEntry.size();
}
// we are interested in this fetch only if the beginning offset matches the
// current consumed position
Long consumed = subscriptions.consumed(tp);
if (consumed == null) {
continue;
} else if (consumed != fetchOffset) {
// the fetched position has gotten out of sync with the consumed position
// (which might happen when a rebalance occurs with a fetch in-flight),
// so we need to reset the fetch position so the next fetch is right
subscriptions.fetched(tp, consumed);
continue;
}
if (parsed.size() > 0) {
ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1);
this.subscriptions.fetched(tp, record.offset() + 1);

View File

@ -36,6 +36,7 @@ import org.junit.Before;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
@ -114,9 +115,26 @@ public class FetcherTest {
}
}
@Test
public void testFetchDuringRebalance() {
subscriptions.subscribe(topicName);
subscriptions.changePartitionAssignment(Arrays.asList(tp));
subscriptions.fetched(tp, 0);
subscriptions.consumed(tp, 0);
fetcher.initFetches(cluster);
// Now the rebalance happens and fetch positions are cleared
subscriptions.changePartitionAssignment(Arrays.asList(tp));
client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L));
consumerClient.poll(0);
// The active fetch should be ignored since its position is no longer valid
assertTrue(fetcher.fetchedRecords().isEmpty());
}
@Test
public void testFetchFailed() {
List<ConsumerRecord<byte[], byte[]>> records;
subscriptions.subscribe(tp);
subscriptions.fetched(tp, 0);
subscriptions.consumed(tp, 0);
@ -148,7 +166,6 @@ public class FetcherTest {
@Test
public void testFetchOutOfRange() {
List<ConsumerRecord<byte[], byte[]>> records;
subscriptions.subscribe(tp);
subscriptions.fetched(tp, 5);
subscriptions.consumed(tp, 5);

View File

@ -37,6 +37,7 @@ import org.apache.kafka.common.protocol.types.SchemaException
import org.apache.kafka.common.utils.{SystemTime, Time, Utils}
import scala.collection._
import scala.util.control.{NonFatal, ControlThrowable}
/**
* An NIO socket server. The threading model is
@ -357,49 +358,57 @@ private[kafka] class Processor(val id: Int,
override def run() {
startupComplete()
while(isRunning) {
// setup any new connections that have been queued up
configureNewConnections()
// register any new responses for writing
processNewResponses()
try {
selector.poll(300)
} catch {
case e @ (_: IllegalStateException | _: IOException) => {
error("Closing processor %s due to illegal state or IO exception".format(id))
swallow(closeAll())
shutdownComplete()
throw e
}
case e: InvalidReceiveException =>
// Log warning and continue since Selector already closed the connection
warn("Connection was closed due to invalid receive. Processor will continue handling other connections")
}
collection.JavaConversions.collectionAsScalaIterable(selector.completedReceives).foreach( receive => {
try {
val req = RequestChannel.Request(processor = id, connectionId = receive.source, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = SecurityProtocol.PLAINTEXT)
requestChannel.sendRequest(req)
} catch {
case e @ (_: InvalidRequestException | _: SchemaException) => {
// note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier
error("Closing socket for " + receive.source + " because of error", e)
selector.close(receive.source)
}
}
selector.mute(receive.source)
})
// setup any new connections that have been queued up
configureNewConnections()
// register any new responses for writing
processNewResponses()
collection.JavaConversions.iterableAsScalaIterable(selector.completedSends()).foreach( send => {
val resp = inflightResponses.remove(send.destination()).get
resp.request.updateRequestMetrics()
selector.unmute(send.destination())
})
try {
selector.poll(300)
} catch {
case e @ (_: IllegalStateException | _: IOException) => {
error("Closing processor %s due to illegal state or IO exception".format(id))
swallow(closeAll())
shutdownComplete()
throw e
}
case e: InvalidReceiveException =>
// Log warning and continue since Selector already closed the connection
warn("Connection was closed due to invalid receive. Processor will continue handling other connections")
}
collection.JavaConversions.collectionAsScalaIterable(selector.completedReceives).foreach(receive => {
try {
val req = RequestChannel.Request(processor = id, connectionId = receive.source, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = SecurityProtocol.PLAINTEXT)
requestChannel.sendRequest(req)
} catch {
case e @ (_: InvalidRequestException | _: SchemaException) => {
// note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier
error("Closing socket for " + receive.source + " because of error", e)
selector.close(receive.source)
}
}
selector.mute(receive.source)
})
collection.JavaConversions.iterableAsScalaIterable(selector.completedSends()).foreach(send => {
val resp = inflightResponses.remove(send.destination()).get
resp.request.updateRequestMetrics()
selector.unmute(send.destination())
})
} catch {
// We catch all the throwables here to prevent the processor thread from exiting. We do this because
// letting a processor exit might cause bigger impact on the broker. Usually the exceptions thrown would
// be either associated with a specific socket channel or a bad request. We just ignore the bad socket channel
// or request. This behavior might need to be reviewed if we see an exception that need the entire broker to stop.
case e : ControlThrowable => throw e
case e : Throwable =>
error("Processor got uncaught exception.", e)
}
}
debug("Closing selector - processor " + id)
closeAll()
swallowError(closeAll())
shutdownComplete()
}
@ -426,8 +435,6 @@ private[kafka] class Processor(val id: Int,
selector.close(curr.request.connectionId)
}
}
} finally {
curr = requestChannel.receiveResponse(id)
}
@ -448,13 +455,22 @@ private[kafka] class Processor(val id: Int,
private def configureNewConnections() {
while(!newConnections.isEmpty) {
val channel = newConnections.poll()
debug("Processor " + id + " listening to new connection from " + channel.socket.getRemoteSocketAddress)
val localHost = channel.socket().getLocalAddress.getHostAddress
val localPort = channel.socket().getLocalPort
val remoteHost = channel.socket().getInetAddress.getHostAddress
val remotePort = channel.socket().getPort
val connectionId = localHost + ":" + localPort + "-" + remoteHost + ":" + remotePort
selector.register(connectionId, channel)
try {
debug("Processor " + id + " listening to new connection from " + channel.socket.getRemoteSocketAddress)
val localHost = channel.socket().getLocalAddress.getHostAddress
val localPort = channel.socket().getLocalPort
val remoteHost = channel.socket().getInetAddress.getHostAddress
val remotePort = channel.socket().getPort
val connectionId = localHost + ":" + localPort + "-" + remoteHost + ":" + remotePort
selector.register(connectionId, channel)
} catch {
// We explicitly catch all non fatal exceptions and close the socket to avoid socket leak. The other
// throwables will be caught in processor and logged as uncaught exception.
case NonFatal(e) =>
// need to close the channel here to avoid socket leak.
close(channel)
error("Processor " + id + " closed connection from " + channel.getRemoteAddress, e)
}
}
}

View File

@ -58,7 +58,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
TestUtils.createTopic(this.zkClient, topic, 1, serverCount, this.servers)
}
def testConsumptionWithBrokerFailures() = consumeWithBrokerFailures(20)
def testConsumptionWithBrokerFailures() = consumeWithBrokerFailures(10)
/*
* 1. Produce a bunch of messages

View File

@ -24,7 +24,7 @@ import kafka.utils.{ZkUtils, TestUtils}
import kafka.server.{KafkaServer, KafkaConfig}
import org.junit.Test
import java.util.Properties
import kafka.common.TopicAndPartition
import kafka.common.{TopicAlreadyMarkedForDeletionException, TopicAndPartition}
class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
@ -249,6 +249,27 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
servers.foreach(_.shutdown())
}
@Test
def testDeleteTopicAlreadyMarkedAsDeleted() {
val topicAndPartition = TopicAndPartition("test", 0)
val topic = topicAndPartition.topic
val servers = createTestTopicAndCluster(topic)
try {
// start topic deletion
AdminUtils.deleteTopic(zkClient, topic)
// try to delete topic marked as deleted
AdminUtils.deleteTopic(zkClient, topic)
fail("Expected TopicAlreadyMarkedForDeletionException")
}
catch {
case e: TopicAlreadyMarkedForDeletionException => // expected exception
}
TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
servers.foreach(_.shutdown())
}
private def createTestTopicAndCluster(topic: String): Seq[KafkaServer] = {
val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false)

View File

@ -130,7 +130,15 @@ def merge_pr(pr_num, target_ref, title, body, pr_repo_desc):
'--pretty=format:%an <%ae>']).split("\n")
distinct_authors = sorted(set(commit_authors),
key=lambda x: commit_authors.count(x), reverse=True)
primary_author = distinct_authors[0]
primary_author = raw_input(
"Enter primary author in the format of \"name <email>\" [%s]: " %
distinct_authors[0])
if primary_author == "":
primary_author = distinct_authors[0]
reviewers = raw_input(
"Enter reviewers in the format of \"name1 <email1>, name2 <email2>\": ").strip()
commits = run_cmd(['git', 'log', 'HEAD..%s' % pr_branch_name,
'--pretty=format:%h [%an] %s']).split("\n\n")
@ -146,6 +154,9 @@ def merge_pr(pr_num, target_ref, title, body, pr_repo_desc):
merge_message_flags += ["-m", authors]
if (reviewers != ""):
merge_message_flags += ["-m", "Reviewers: %s" % reviewers]
if had_conflicts:
committer_name = run_cmd("git config --get user.name").strip()
committer_email = run_cmd("git config --get user.email").strip()
@ -278,7 +289,10 @@ def resolve_jira_issue(merge_branches, comment, default_jira_id=""):
jira_fix_versions = map(lambda v: get_version_json(v), fix_versions)
resolve = filter(lambda a: a['name'] == "Resolve Issue", asf_jira.transitions(jira_id))[0]
asf_jira.transition_issue(jira_id, resolve["id"], fixVersions=jira_fix_versions, comment=comment)
resolution = filter(lambda r: r.raw['name'] == "Fixed", asf_jira.resolutions())[0]
asf_jira.transition_issue(
jira_id, resolve["id"], fixVersions = jira_fix_versions,
comment = comment, resolution = {'id': resolution.raw['id']})
print "Successfully resolved %s with fixVersions=%s!" % (jira_id, fix_versions)
@ -435,11 +449,13 @@ def main():
print "JIRA_USERNAME and JIRA_PASSWORD not set"
print "Exiting without trying to close the associated JIRA."
else:
print "Could not find jira-python library. Run 'sudo pip install jira-python' to install."
print "Could not find jira-python library. Run 'sudo pip install jira' to install."
print "Exiting without trying to close the associated JIRA."
if __name__ == "__main__":
import doctest
doctest.testmod()
(failure_count, test_count) = doctest.testmod()
if (failure_count):
exit(-1)
main()