mirror of https://github.com/apache/kafka.git
Merged in upstream trunk.
This commit is contained in:
commit
a70f0f8ce1
|
@ -0,0 +1,11 @@
|
|||
## Contributing to Kafka
|
||||
|
||||
*Before opening a pull request*, review the [Contributing](http://kafka.apache.org/contributing.html) and [Contributing Code Changes](https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes) pages.
|
||||
|
||||
It lists steps that are required before creating a PR.
|
||||
|
||||
When you contribute code, you affirm that the contribution is your original work and that you
|
||||
license the work to the project under the project's open source license. Whether or not you
|
||||
state this explicitly, by submitting any copyrighted material via pull request, email, or
|
||||
other means you agree to license the material under the project's open source license and
|
||||
warrant that you have the legal authority to do so.
|
|
@ -54,11 +54,11 @@ The release file can be found inside ./core/build/distributions/.
|
|||
### Cleaning the build ###
|
||||
./gradlew clean
|
||||
|
||||
### Running a task on a particular version of Scala (either 2.9.1, 2.9.2, 2.10.5 or 2.11.7) ###
|
||||
### Running a task on a particular version of Scala (either 2.10.5 or 2.11.7) ###
|
||||
#### (If building a jar with a version other than 2.10, need to set SCALA_BINARY_VERSION variable or change it in bin/kafka-run-class.sh to run quick start.) ####
|
||||
./gradlew -PscalaVersion=2.9.1 jar
|
||||
./gradlew -PscalaVersion=2.9.1 test
|
||||
./gradlew -PscalaVersion=2.9.1 releaseTarGz
|
||||
./gradlew -PscalaVersion=2.11.7 jar
|
||||
./gradlew -PscalaVersion=2.11.7 test
|
||||
./gradlew -PscalaVersion=2.11.7 releaseTarGz
|
||||
|
||||
### Running a task for a specific project ###
|
||||
This is for 'core', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'examples' and 'clients'
|
||||
|
|
20
build.gradle
20
build.gradle
|
@ -60,6 +60,7 @@ rat {
|
|||
excludes.addAll([
|
||||
'**/.git/**',
|
||||
'build/**',
|
||||
'CONTRIBUTING.md',
|
||||
'gradlew',
|
||||
'gradlew.bat',
|
||||
'**/README.md',
|
||||
|
@ -163,7 +164,7 @@ subprojects {
|
|||
}
|
||||
}
|
||||
|
||||
for ( sv in ['2_9_1', '2_9_2', '2_10_5', '2_11_7'] ) {
|
||||
for ( sv in ['2_10_5', '2_11_7'] ) {
|
||||
String svInDot = sv.replaceAll( "_", ".")
|
||||
|
||||
tasks.create(name: "jar_core_${sv}", type: GradleBuild) {
|
||||
|
@ -203,20 +204,20 @@ for ( sv in ['2_9_1', '2_9_2', '2_10_5', '2_11_7'] ) {
|
|||
}
|
||||
}
|
||||
|
||||
tasks.create(name: "jarAll", dependsOn: ['jar_core_2_9_1', 'jar_core_2_9_2', 'jar_core_2_10_5', 'jar_core_2_11_7', 'clients:jar', 'examples:jar', 'contrib:hadoop-consumer:jar', 'contrib:hadoop-producer:jar', 'log4j-appender:jar', 'tools:jar']) {
|
||||
tasks.create(name: "jarAll", dependsOn: ['jar_core_2_10_5', 'jar_core_2_11_7', 'clients:jar', 'examples:jar', 'contrib:hadoop-consumer:jar', 'contrib:hadoop-producer:jar', 'log4j-appender:jar', 'tools:jar']) {
|
||||
}
|
||||
|
||||
tasks.create(name: "srcJarAll", dependsOn: ['srcJar_2_9_1', 'srcJar_2_9_2', 'srcJar_2_10_5', 'srcJar_2_11_7', 'clients:srcJar', 'examples:srcJar', 'contrib:hadoop-consumer:srcJar', 'contrib:hadoop-producer:srcJar', 'log4j-appender:srcJar', 'tools:srcJar']) { }
|
||||
tasks.create(name: "srcJarAll", dependsOn: ['srcJar_2_10_5', 'srcJar_2_11_7', 'clients:srcJar', 'examples:srcJar', 'contrib:hadoop-consumer:srcJar', 'contrib:hadoop-producer:srcJar', 'log4j-appender:srcJar', 'tools:srcJar']) { }
|
||||
|
||||
tasks.create(name: "docsJarAll", dependsOn: ['docsJar_2_9_1', 'docsJar_2_9_2', 'docsJar_2_10_5', 'docsJar_2_11_7', 'clients:docsJar', 'examples:docsJar', 'contrib:hadoop-consumer:docsJar', 'contrib:hadoop-producer:docsJar', 'log4j-appender:docsJar', 'tools:docsJar']) { }
|
||||
tasks.create(name: "docsJarAll", dependsOn: ['docsJar_2_10_5', 'docsJar_2_11_7', 'clients:docsJar', 'examples:docsJar', 'contrib:hadoop-consumer:docsJar', 'contrib:hadoop-producer:docsJar', 'log4j-appender:docsJar', 'tools:docsJar']) { }
|
||||
|
||||
tasks.create(name: "testAll", dependsOn: ['test_core_2_9_1', 'test_core_2_9_2', 'test_core_2_10_5', 'test_core_2_11_7', 'clients:test', 'log4j-appender:test', 'tools:test']) {
|
||||
tasks.create(name: "testAll", dependsOn: ['test_core_2_10_5', 'test_core_2_11_7', 'clients:test', 'log4j-appender:test', 'tools:test']) {
|
||||
}
|
||||
|
||||
tasks.create(name: "releaseTarGzAll", dependsOn: ['releaseTarGz_2_9_1', 'releaseTarGz_2_9_2', 'releaseTarGz_2_10_5', 'releaseTarGz_2_11_7']) {
|
||||
tasks.create(name: "releaseTarGzAll", dependsOn: ['releaseTarGz_2_10_5', 'releaseTarGz_2_11_7']) {
|
||||
}
|
||||
|
||||
tasks.create(name: "uploadArchivesAll", dependsOn: ['uploadCoreArchives_2_9_1', 'uploadCoreArchives_2_9_2', 'uploadCoreArchives_2_10_5', 'uploadCoreArchives_2_11_7', 'clients:uploadArchives', 'examples:uploadArchives', 'contrib:hadoop-consumer:uploadArchives', 'contrib:hadoop-producer:uploadArchives', 'log4j-appender:uploadArchives', 'tools:uploadArchives']) {
|
||||
tasks.create(name: "uploadArchivesAll", dependsOn: ['uploadCoreArchives_2_10_5', 'uploadCoreArchives_2_11_7', 'clients:uploadArchives', 'examples:uploadArchives', 'contrib:hadoop-consumer:uploadArchives', 'contrib:hadoop-producer:uploadArchives', 'log4j-appender:uploadArchives', 'tools:uploadArchives']) {
|
||||
}
|
||||
|
||||
project(':core') {
|
||||
|
@ -241,10 +242,7 @@ project(':core') {
|
|||
testCompile 'junit:junit:4.6'
|
||||
testCompile 'org.easymock:easymock:3.0'
|
||||
testCompile 'org.objenesis:objenesis:1.2'
|
||||
if (scalaVersion.startsWith('2.9'))
|
||||
testCompile "org.scalatest:scalatest_$scalaVersion:1.9.1"
|
||||
else
|
||||
testCompile "org.scalatest:scalatest_$baseScalaVersion:2.2.5"
|
||||
testCompile "org.scalatest:scalatest_$baseScalaVersion:2.2.5"
|
||||
|
||||
testRuntime "$slf4jlog4j"
|
||||
|
||||
|
|
|
@ -219,7 +219,7 @@ public class Fetcher<K, V> {
|
|||
for (PartitionRecords<K, V> part : this.records) {
|
||||
Long consumed = subscriptions.consumed(part.partition);
|
||||
if (this.subscriptions.assignedPartitions().contains(part.partition)
|
||||
&& (consumed == null || part.fetchOffset == consumed)) {
|
||||
&& consumed != null && part.fetchOffset == consumed) {
|
||||
List<ConsumerRecord<K, V>> records = drained.get(part.partition);
|
||||
if (records == null) {
|
||||
records = part.records;
|
||||
|
@ -364,6 +364,20 @@ public class Fetcher<K, V> {
|
|||
parsed.add(parseRecord(tp, logEntry));
|
||||
bytes += logEntry.size();
|
||||
}
|
||||
|
||||
// we are interested in this fetch only if the beginning offset matches the
|
||||
// current consumed position
|
||||
Long consumed = subscriptions.consumed(tp);
|
||||
if (consumed == null) {
|
||||
continue;
|
||||
} else if (consumed != fetchOffset) {
|
||||
// the fetched position has gotten out of sync with the consumed position
|
||||
// (which might happen when a rebalance occurs with a fetch in-flight),
|
||||
// so we need to reset the fetch position so the next fetch is right
|
||||
subscriptions.fetched(tp, consumed);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (parsed.size() > 0) {
|
||||
ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1);
|
||||
this.subscriptions.fetched(tp, record.offset() + 1);
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.junit.Before;
|
|||
import org.junit.Test;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
|
@ -114,9 +115,26 @@ public class FetcherTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFetchDuringRebalance() {
|
||||
subscriptions.subscribe(topicName);
|
||||
subscriptions.changePartitionAssignment(Arrays.asList(tp));
|
||||
subscriptions.fetched(tp, 0);
|
||||
subscriptions.consumed(tp, 0);
|
||||
|
||||
fetcher.initFetches(cluster);
|
||||
|
||||
// Now the rebalance happens and fetch positions are cleared
|
||||
subscriptions.changePartitionAssignment(Arrays.asList(tp));
|
||||
client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L));
|
||||
consumerClient.poll(0);
|
||||
|
||||
// The active fetch should be ignored since its position is no longer valid
|
||||
assertTrue(fetcher.fetchedRecords().isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFetchFailed() {
|
||||
List<ConsumerRecord<byte[], byte[]>> records;
|
||||
subscriptions.subscribe(tp);
|
||||
subscriptions.fetched(tp, 0);
|
||||
subscriptions.consumed(tp, 0);
|
||||
|
@ -148,7 +166,6 @@ public class FetcherTest {
|
|||
|
||||
@Test
|
||||
public void testFetchOutOfRange() {
|
||||
List<ConsumerRecord<byte[], byte[]>> records;
|
||||
subscriptions.subscribe(tp);
|
||||
subscriptions.fetched(tp, 5);
|
||||
subscriptions.consumed(tp, 5);
|
||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.kafka.common.protocol.types.SchemaException
|
|||
import org.apache.kafka.common.utils.{SystemTime, Time, Utils}
|
||||
|
||||
import scala.collection._
|
||||
import scala.util.control.{NonFatal, ControlThrowable}
|
||||
|
||||
/**
|
||||
* An NIO socket server. The threading model is
|
||||
|
@ -357,49 +358,57 @@ private[kafka] class Processor(val id: Int,
|
|||
override def run() {
|
||||
startupComplete()
|
||||
while(isRunning) {
|
||||
// setup any new connections that have been queued up
|
||||
configureNewConnections()
|
||||
// register any new responses for writing
|
||||
processNewResponses()
|
||||
|
||||
try {
|
||||
selector.poll(300)
|
||||
} catch {
|
||||
case e @ (_: IllegalStateException | _: IOException) => {
|
||||
error("Closing processor %s due to illegal state or IO exception".format(id))
|
||||
swallow(closeAll())
|
||||
shutdownComplete()
|
||||
throw e
|
||||
}
|
||||
case e: InvalidReceiveException =>
|
||||
// Log warning and continue since Selector already closed the connection
|
||||
warn("Connection was closed due to invalid receive. Processor will continue handling other connections")
|
||||
}
|
||||
collection.JavaConversions.collectionAsScalaIterable(selector.completedReceives).foreach( receive => {
|
||||
try {
|
||||
val req = RequestChannel.Request(processor = id, connectionId = receive.source, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = SecurityProtocol.PLAINTEXT)
|
||||
requestChannel.sendRequest(req)
|
||||
} catch {
|
||||
case e @ (_: InvalidRequestException | _: SchemaException) => {
|
||||
// note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier
|
||||
error("Closing socket for " + receive.source + " because of error", e)
|
||||
selector.close(receive.source)
|
||||
}
|
||||
}
|
||||
selector.mute(receive.source)
|
||||
})
|
||||
// setup any new connections that have been queued up
|
||||
configureNewConnections()
|
||||
// register any new responses for writing
|
||||
processNewResponses()
|
||||
|
||||
collection.JavaConversions.iterableAsScalaIterable(selector.completedSends()).foreach( send => {
|
||||
val resp = inflightResponses.remove(send.destination()).get
|
||||
resp.request.updateRequestMetrics()
|
||||
selector.unmute(send.destination())
|
||||
})
|
||||
try {
|
||||
selector.poll(300)
|
||||
} catch {
|
||||
case e @ (_: IllegalStateException | _: IOException) => {
|
||||
error("Closing processor %s due to illegal state or IO exception".format(id))
|
||||
swallow(closeAll())
|
||||
shutdownComplete()
|
||||
throw e
|
||||
}
|
||||
case e: InvalidReceiveException =>
|
||||
// Log warning and continue since Selector already closed the connection
|
||||
warn("Connection was closed due to invalid receive. Processor will continue handling other connections")
|
||||
}
|
||||
collection.JavaConversions.collectionAsScalaIterable(selector.completedReceives).foreach(receive => {
|
||||
try {
|
||||
val req = RequestChannel.Request(processor = id, connectionId = receive.source, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = SecurityProtocol.PLAINTEXT)
|
||||
requestChannel.sendRequest(req)
|
||||
} catch {
|
||||
case e @ (_: InvalidRequestException | _: SchemaException) => {
|
||||
// note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier
|
||||
error("Closing socket for " + receive.source + " because of error", e)
|
||||
selector.close(receive.source)
|
||||
}
|
||||
}
|
||||
selector.mute(receive.source)
|
||||
})
|
||||
|
||||
collection.JavaConversions.iterableAsScalaIterable(selector.completedSends()).foreach(send => {
|
||||
val resp = inflightResponses.remove(send.destination()).get
|
||||
resp.request.updateRequestMetrics()
|
||||
selector.unmute(send.destination())
|
||||
})
|
||||
} catch {
|
||||
// We catch all the throwables here to prevent the processor thread from exiting. We do this because
|
||||
// letting a processor exit might cause bigger impact on the broker. Usually the exceptions thrown would
|
||||
// be either associated with a specific socket channel or a bad request. We just ignore the bad socket channel
|
||||
// or request. This behavior might need to be reviewed if we see an exception that need the entire broker to stop.
|
||||
case e : ControlThrowable => throw e
|
||||
case e : Throwable =>
|
||||
error("Processor got uncaught exception.", e)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
debug("Closing selector - processor " + id)
|
||||
closeAll()
|
||||
swallowError(closeAll())
|
||||
shutdownComplete()
|
||||
}
|
||||
|
||||
|
@ -426,8 +435,6 @@ private[kafka] class Processor(val id: Int,
|
|||
selector.close(curr.request.connectionId)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
} finally {
|
||||
curr = requestChannel.receiveResponse(id)
|
||||
}
|
||||
|
@ -448,13 +455,22 @@ private[kafka] class Processor(val id: Int,
|
|||
private def configureNewConnections() {
|
||||
while(!newConnections.isEmpty) {
|
||||
val channel = newConnections.poll()
|
||||
debug("Processor " + id + " listening to new connection from " + channel.socket.getRemoteSocketAddress)
|
||||
val localHost = channel.socket().getLocalAddress.getHostAddress
|
||||
val localPort = channel.socket().getLocalPort
|
||||
val remoteHost = channel.socket().getInetAddress.getHostAddress
|
||||
val remotePort = channel.socket().getPort
|
||||
val connectionId = localHost + ":" + localPort + "-" + remoteHost + ":" + remotePort
|
||||
selector.register(connectionId, channel)
|
||||
try {
|
||||
debug("Processor " + id + " listening to new connection from " + channel.socket.getRemoteSocketAddress)
|
||||
val localHost = channel.socket().getLocalAddress.getHostAddress
|
||||
val localPort = channel.socket().getLocalPort
|
||||
val remoteHost = channel.socket().getInetAddress.getHostAddress
|
||||
val remotePort = channel.socket().getPort
|
||||
val connectionId = localHost + ":" + localPort + "-" + remoteHost + ":" + remotePort
|
||||
selector.register(connectionId, channel)
|
||||
} catch {
|
||||
// We explicitly catch all non fatal exceptions and close the socket to avoid socket leak. The other
|
||||
// throwables will be caught in processor and logged as uncaught exception.
|
||||
case NonFatal(e) =>
|
||||
// need to close the channel here to avoid socket leak.
|
||||
close(channel)
|
||||
error("Processor " + id + " closed connection from " + channel.getRemoteAddress, e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -58,7 +58,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
|
|||
TestUtils.createTopic(this.zkClient, topic, 1, serverCount, this.servers)
|
||||
}
|
||||
|
||||
def testConsumptionWithBrokerFailures() = consumeWithBrokerFailures(20)
|
||||
def testConsumptionWithBrokerFailures() = consumeWithBrokerFailures(10)
|
||||
|
||||
/*
|
||||
* 1. Produce a bunch of messages
|
||||
|
|
|
@ -24,7 +24,7 @@ import kafka.utils.{ZkUtils, TestUtils}
|
|||
import kafka.server.{KafkaServer, KafkaConfig}
|
||||
import org.junit.Test
|
||||
import java.util.Properties
|
||||
import kafka.common.TopicAndPartition
|
||||
import kafka.common.{TopicAlreadyMarkedForDeletionException, TopicAndPartition}
|
||||
|
||||
class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||
|
||||
|
@ -249,6 +249,27 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
|
|||
servers.foreach(_.shutdown())
|
||||
}
|
||||
|
||||
@Test
|
||||
def testDeleteTopicAlreadyMarkedAsDeleted() {
|
||||
val topicAndPartition = TopicAndPartition("test", 0)
|
||||
val topic = topicAndPartition.topic
|
||||
val servers = createTestTopicAndCluster(topic)
|
||||
|
||||
try {
|
||||
// start topic deletion
|
||||
AdminUtils.deleteTopic(zkClient, topic)
|
||||
// try to delete topic marked as deleted
|
||||
AdminUtils.deleteTopic(zkClient, topic)
|
||||
fail("Expected TopicAlreadyMarkedForDeletionException")
|
||||
}
|
||||
catch {
|
||||
case e: TopicAlreadyMarkedForDeletionException => // expected exception
|
||||
}
|
||||
|
||||
TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
|
||||
servers.foreach(_.shutdown())
|
||||
}
|
||||
|
||||
private def createTestTopicAndCluster(topic: String): Seq[KafkaServer] = {
|
||||
|
||||
val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false)
|
||||
|
|
|
@ -130,7 +130,15 @@ def merge_pr(pr_num, target_ref, title, body, pr_repo_desc):
|
|||
'--pretty=format:%an <%ae>']).split("\n")
|
||||
distinct_authors = sorted(set(commit_authors),
|
||||
key=lambda x: commit_authors.count(x), reverse=True)
|
||||
primary_author = distinct_authors[0]
|
||||
primary_author = raw_input(
|
||||
"Enter primary author in the format of \"name <email>\" [%s]: " %
|
||||
distinct_authors[0])
|
||||
if primary_author == "":
|
||||
primary_author = distinct_authors[0]
|
||||
|
||||
reviewers = raw_input(
|
||||
"Enter reviewers in the format of \"name1 <email1>, name2 <email2>\": ").strip()
|
||||
|
||||
commits = run_cmd(['git', 'log', 'HEAD..%s' % pr_branch_name,
|
||||
'--pretty=format:%h [%an] %s']).split("\n\n")
|
||||
|
||||
|
@ -146,6 +154,9 @@ def merge_pr(pr_num, target_ref, title, body, pr_repo_desc):
|
|||
|
||||
merge_message_flags += ["-m", authors]
|
||||
|
||||
if (reviewers != ""):
|
||||
merge_message_flags += ["-m", "Reviewers: %s" % reviewers]
|
||||
|
||||
if had_conflicts:
|
||||
committer_name = run_cmd("git config --get user.name").strip()
|
||||
committer_email = run_cmd("git config --get user.email").strip()
|
||||
|
@ -278,7 +289,10 @@ def resolve_jira_issue(merge_branches, comment, default_jira_id=""):
|
|||
jira_fix_versions = map(lambda v: get_version_json(v), fix_versions)
|
||||
|
||||
resolve = filter(lambda a: a['name'] == "Resolve Issue", asf_jira.transitions(jira_id))[0]
|
||||
asf_jira.transition_issue(jira_id, resolve["id"], fixVersions=jira_fix_versions, comment=comment)
|
||||
resolution = filter(lambda r: r.raw['name'] == "Fixed", asf_jira.resolutions())[0]
|
||||
asf_jira.transition_issue(
|
||||
jira_id, resolve["id"], fixVersions = jira_fix_versions,
|
||||
comment = comment, resolution = {'id': resolution.raw['id']})
|
||||
|
||||
print "Successfully resolved %s with fixVersions=%s!" % (jira_id, fix_versions)
|
||||
|
||||
|
@ -435,11 +449,13 @@ def main():
|
|||
print "JIRA_USERNAME and JIRA_PASSWORD not set"
|
||||
print "Exiting without trying to close the associated JIRA."
|
||||
else:
|
||||
print "Could not find jira-python library. Run 'sudo pip install jira-python' to install."
|
||||
print "Could not find jira-python library. Run 'sudo pip install jira' to install."
|
||||
print "Exiting without trying to close the associated JIRA."
|
||||
|
||||
if __name__ == "__main__":
|
||||
import doctest
|
||||
doctest.testmod()
|
||||
(failure_count, test_count) = doctest.testmod()
|
||||
if (failure_count):
|
||||
exit(-1)
|
||||
|
||||
main()
|
||||
|
|
Loading…
Reference in New Issue