KAFKA-2208; add consumer side error handling upon coordinator failure; reviewed by Onur Karaman

This commit is contained in:
Guozhang Wang 2015-06-03 13:47:01 -07:00
parent 48edeca33b
commit d22987f01d
11 changed files with 151 additions and 60 deletions

View File

@ -120,30 +120,58 @@ public final class Coordinator {
// send a join group request to the coordinator
log.debug("(Re-)joining group {} with subscribed topics {}", groupId, subscribedTopics);
JoinGroupRequest request = new JoinGroupRequest(groupId,
(int) this.sessionTimeoutMs,
subscribedTopics,
this.consumerId,
this.assignmentStrategy);
ClientResponse resp = this.blockingCoordinatorRequest(ApiKeys.JOIN_GROUP, request.toStruct(), null, now);
// repeat processing the response until succeed or fatal error
do {
JoinGroupRequest request = new JoinGroupRequest(groupId,
(int) this.sessionTimeoutMs,
subscribedTopics,
this.consumerId,
this.assignmentStrategy);
// process the response
JoinGroupResponse response = new JoinGroupResponse(resp.responseBody());
// TODO: needs to handle disconnects and errors, should not just throw exceptions
Errors.forCode(response.errorCode()).maybeThrow();
this.consumerId = response.consumerId();
this.generation = response.generationId();
ClientResponse resp = this.blockingCoordinatorRequest(ApiKeys.JOIN_GROUP, request.toStruct(), null, now);
JoinGroupResponse response = new JoinGroupResponse(resp.responseBody());
short errorCode = response.errorCode();
// set the flag to refresh last committed offsets
this.subscriptions.needRefreshCommits();
if (errorCode == Errors.NONE.code()) {
this.consumerId = response.consumerId();
this.generation = response.generationId();
log.debug("Joined group: {}", response);
// set the flag to refresh last committed offsets
this.subscriptions.needRefreshCommits();
// record re-assignment time
this.sensors.partitionReassignments.record(time.milliseconds() - now);
log.debug("Joined group: {}", response);
// return assigned partitions
return response.assignedPartitions();
// record re-assignment time
this.sensors.partitionReassignments.record(time.milliseconds() - now);
// return assigned partitions
return response.assignedPartitions();
} else if (errorCode == Errors.UNKNOWN_CONSUMER_ID.code()) {
// reset the consumer id and retry immediately
this.consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID;
log.info("Attempt to join group {} failed due to unknown consumer id, resetting and retrying.",
groupId);
} else if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()
|| errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
// re-discover the coordinator and retry with backoff
coordinatorDead();
Utils.sleep(this.retryBackoffMs);
log.info("Attempt to join group {} failed due to obsolete coordinator information, retrying.",
groupId);
} else if (errorCode == Errors.UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY.code()
|| errorCode == Errors.INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY.code()
|| errorCode == Errors.INVALID_SESSION_TIMEOUT.code()) {
// log the error and re-throw the exception
log.error("Attempt to join group {} failed due to: {}",
groupId, Errors.forCode(errorCode).exception().getMessage());
Errors.forCode(errorCode).maybeThrow();
} else {
// unexpected error, throw the exception
throw new KafkaException("Unexpected error in join group response: "
+ Errors.forCode(response.errorCode()).exception().getMessage());
}
} while (true);
}
/**
@ -217,7 +245,6 @@ public final class Coordinator {
// parse the response to get the offsets
boolean offsetsReady = true;
OffsetFetchResponse response = new OffsetFetchResponse(resp.responseBody());
// TODO: needs to handle disconnects
Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>(response.responseData().size());
for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : response.responseData().entrySet()) {
TopicPartition tp = entry.getKey();
@ -239,7 +266,8 @@ public final class Coordinator {
// just ignore this partition
log.debug("Unknown topic or partition for " + tp);
} else {
throw new IllegalStateException("Unexpected error code " + data.errorCode + " while fetching offset");
throw new KafkaException("Unexpected error in fetch offset response: "
+ Errors.forCode(data.errorCode).exception().getMessage());
}
} else if (data.offset >= 0) {
// record the position with the offset (-1 indicates no committed offset to fetch)
@ -471,9 +499,15 @@ public final class Coordinator {
if (response.errorCode() == Errors.NONE.code()) {
log.debug("Received successful heartbeat response.");
} else if (response.errorCode() == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()
|| response.errorCode() == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
|| response.errorCode() == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
log.info("Attempt to heart beat failed since coordinator is either not started or not valid, marking it as dead.");
coordinatorDead();
} else if (response.errorCode() == Errors.ILLEGAL_GENERATION.code()) {
log.info("Attempt to heart beat failed since generation id is not legal, try to re-join group.");
subscriptions.needReassignment();
} else if (response.errorCode() == Errors.UNKNOWN_CONSUMER_ID.code()) {
log.info("Attempt to heart beat failed since consumer id is not valid, reset it and try to re-join group.");
consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID;
subscriptions.needReassignment();
} else {
throw new KafkaException("Unexpected error in heartbeat response: "
@ -506,9 +540,10 @@ public final class Coordinator {
log.debug("Committed offset {} for partition {}", offset, tp);
subscriptions.committed(tp, offset);
} else if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()
|| errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
|| errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
coordinatorDead();
} else {
// do not need to throw the exception but just log the error
log.error("Error committing partition {} at offset {}: {}",
tp,
offset,

View File

@ -231,13 +231,14 @@ public class Fetcher<K, V> {
log.debug("Fetched offset {} for partition {}", offset, topicPartition);
return offset;
} else if (errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
|| errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
|| errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
log.warn("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.",
topicPartition);
awaitMetadataUpdate();
} else {
// TODO: we should not just throw exceptions but should handle and log it.
Errors.forCode(errorCode).maybeThrow();
log.error("Attempt to fetch offsets for partition {} failed due to: {}",
topicPartition, Errors.forCode(errorCode).exception().getMessage());
awaitMetadataUpdate();
}
}
} else {

View File

@ -27,7 +27,10 @@ public class HeartbeatResponse extends AbstractRequestResponse {
/**
* Possible error code:
*
* TODO
* CONSUMER_COORDINATOR_NOT_AVAILABLE (15)
* NOT_COORDINATOR_FOR_CONSUMER (16)
* ILLEGAL_GENERATION (22)
* UNKNOWN_CONSUMER_ID (25)
*/
private final short errorCode;

View File

@ -30,7 +30,12 @@ public class JoinGroupResponse extends AbstractRequestResponse {
/**
* Possible error code:
*
* TODO
* CONSUMER_COORDINATOR_NOT_AVAILABLE (15)
* NOT_COORDINATOR_FOR_CONSUMER (16)
* INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY (23)
* UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY (24)
* UNKNOWN_CONSUMER_ID (25)
* INVALID_SESSION_TIMEOUT (26)
*/
private static final String GENERATION_ID_KEY_NAME = "group_generation_id";

View File

@ -25,11 +25,6 @@ import org.apache.kafka.common.requests.JoinGroupRequest
import org.I0Itec.zkclient.ZkClient
import java.util.concurrent.atomic.AtomicBoolean
// TODO: expose MinSessionTimeoutMs and MaxSessionTimeoutMs in broker configs
object ConsumerCoordinator {
private val MinSessionTimeoutMs = 6000
private val MaxSessionTimeoutMs = 30000
}
/**
* ConsumerCoordinator handles consumer group and consumer offset management.
@ -41,7 +36,6 @@ object ConsumerCoordinator {
class ConsumerCoordinator(val config: KafkaConfig,
val zkClient: ZkClient,
val offsetManager: OffsetManager) extends Logging {
import ConsumerCoordinator._
this.logIdent = "[ConsumerCoordinator " + config.brokerId + "]: "
@ -93,15 +87,18 @@ class ConsumerCoordinator(val config: KafkaConfig,
responseCallback(Set.empty, consumerId, 0, Errors.NOT_COORDINATOR_FOR_CONSUMER.code)
} else if (!PartitionAssignor.strategies.contains(partitionAssignmentStrategy)) {
responseCallback(Set.empty, consumerId, 0, Errors.UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY.code)
} else if (sessionTimeoutMs < MinSessionTimeoutMs || sessionTimeoutMs > MaxSessionTimeoutMs) {
} else if (sessionTimeoutMs < config.consumerMinSessionTimeoutMs || sessionTimeoutMs > config.consumerMaxSessionTimeoutMs) {
responseCallback(Set.empty, consumerId, 0, Errors.INVALID_SESSION_TIMEOUT.code)
} else {
val group = coordinatorMetadata.getGroup(groupId)
// only try to create the group if the group is not unknown AND
// the consumer id is UNKNOWN, if consumer is specified but group does not
// exist we should reject the request
var group = coordinatorMetadata.getGroup(groupId)
if (group == null) {
if (consumerId != JoinGroupRequest.UNKNOWN_CONSUMER_ID) {
responseCallback(Set.empty, consumerId, 0, Errors.UNKNOWN_CONSUMER_ID.code)
} else {
val group = coordinatorMetadata.addGroup(groupId, partitionAssignmentStrategy)
group = coordinatorMetadata.addGroup(groupId, partitionAssignmentStrategy)
doJoinGroup(group, consumerId, topics, sessionTimeoutMs, partitionAssignmentStrategy, responseCallback)
}
} else {
@ -118,10 +115,16 @@ class ConsumerCoordinator(val config: KafkaConfig,
responseCallback:(Set[TopicAndPartition], String, Int, Short) => Unit) {
group synchronized {
if (group.is(Dead)) {
// if the group is marked as dead, it means some other thread has just removed the group
// from the coordinator metadata; this is likely that the group has migrated to some other
// coordinator OR the group is in a transient unstable phase. Let the consumer to retry
// joining without specified consumer id,
responseCallback(Set.empty, consumerId, 0, Errors.UNKNOWN_CONSUMER_ID.code)
} else if (partitionAssignmentStrategy != group.partitionAssignmentStrategy) {
responseCallback(Set.empty, consumerId, 0, Errors.INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY.code)
} else if (consumerId != JoinGroupRequest.UNKNOWN_CONSUMER_ID && !group.has(consumerId)) {
// if the consumer trying to register with a un-recognized id, send the response to let
// it reset its consumer id and retry
responseCallback(Set.empty, consumerId, 0, Errors.UNKNOWN_CONSUMER_ID.code)
} else if (group.has(consumerId) && group.is(Stable) && topics == group.get(consumerId).topics) {
/*
@ -170,6 +173,10 @@ class ConsumerCoordinator(val config: KafkaConfig,
} else {
val group = coordinatorMetadata.getGroup(groupId)
if (group == null) {
// if the group is marked as dead, it means some other thread has just removed the group
// from the coordinator metadata; this is likely that the group has migrated to some other
// coordinator OR the group is in a transient unstable phase. Let the consumer to retry
// joining without specified consumer id,
responseCallback(Errors.UNKNOWN_CONSUMER_ID.code)
} else {
group synchronized {
@ -304,7 +311,7 @@ class ConsumerCoordinator(val config: KafkaConfig,
if (group.isEmpty) {
group.transitionTo(Dead)
info("Group %s generation %s is dead".format(group.groupId, group.generationId))
info("Group %s generation %s is dead and removed".format(group.groupId, group.generationId))
coordinatorMetadata.removeGroup(group.groupId, group.topics)
}
}

View File

@ -62,6 +62,14 @@ private[coordinator] case object Stable extends GroupState { val state: Byte = 3
private[coordinator] case object Dead extends GroupState { val state: Byte = 4 }
private object ConsumerGroupMetadata {
private val validPreviousStates: Map[GroupState, Set[GroupState]] =
Map(Dead -> Set(PreparingRebalance),
Stable -> Set(Rebalancing),
PreparingRebalance -> Set(Stable),
Rebalancing -> Set(PreparingRebalance))
}
/**
* Group contains the following metadata:
*
@ -77,12 +85,6 @@ private[coordinator] case object Dead extends GroupState { val state: Byte = 4 }
private[coordinator] class ConsumerGroupMetadata(val groupId: String,
val partitionAssignmentStrategy: String) {
private val validPreviousStates: Map[GroupState, Set[GroupState]] =
Map(Dead -> Set(PreparingRebalance),
Stable -> Set(Rebalancing),
PreparingRebalance -> Set(Stable),
Rebalancing -> Set(PreparingRebalance))
private val consumers = new mutable.HashMap[String, ConsumerMetadata]
private var state: GroupState = Stable
var generationId = 0
@ -124,8 +126,8 @@ private[coordinator] class ConsumerGroupMetadata(val groupId: String,
}
private def assertValidTransition(targetState: GroupState) {
if (!validPreviousStates(targetState).contains(state))
if (!ConsumerGroupMetadata.validPreviousStates(targetState).contains(state))
throw new IllegalStateException("Group %s should be in the %s states before moving to %s state. Instead it is in %s state"
.format(groupId, validPreviousStates(targetState).mkString(","), targetState, state))
.format(groupId, ConsumerGroupMetadata.validPreviousStates(targetState).mkString(","), targetState, state))
}
}

View File

@ -49,11 +49,14 @@ object RequestChannel extends Logging {
@volatile var responseCompleteTimeMs = -1L
@volatile var responseDequeueTimeMs = -1L
val requestId = buffer.getShort()
// for server-side request / response format
// TODO: this will be removed once we migrated to client-side format
val requestObj =
if ( RequestKeys.keyToNameAndDeserializerMap.contains(requestId))
RequestKeys.deserializerForKey(requestId)(buffer)
else
null
// for client-side request / response format
val header: RequestHeader =
if (requestObj == null) {
buffer.rewind
@ -68,7 +71,7 @@ object RequestChannel extends Logging {
buffer = null
private val requestLogger = Logger.getLogger("kafka.request.logger")
trace("Processor %d received request : %s".format(processor, requestObj))
trace("Processor %d received request : %s".format(processor, if (requestObj != null) requestObj.describe(false) else header.toString + " : " + body.toString))
def updateRequestMetrics() {
val endTimeMs = SystemTime.milliseconds
@ -101,10 +104,10 @@ object RequestChannel extends Logging {
}
if(requestLogger.isTraceEnabled)
requestLogger.trace("Completed request:%s from client %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d"
.format(requestObj.describe(true), remoteAddress, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime))
.format(if (requestObj != null) requestObj.describe(true) else header.toString + " : " + body.toString, remoteAddress, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime))
else if(requestLogger.isDebugEnabled) {
requestLogger.debug("Completed request:%s from client %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d"
.format(requestObj.describe(false), remoteAddress, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime))
.format(if (requestObj != null) requestObj.describe(false) else header.toString + " : " + body.toString, remoteAddress, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime))
}
}
}

View File

@ -111,6 +111,10 @@ object Defaults {
val ControlledShutdownRetryBackoffMs = 5000
val ControlledShutdownEnable = true
/** ********* Consumer coordinator configuration ***********/
val ConsumerMinSessionTimeoutMs = 6000
val ConsumerMaxSessionTimeoutMs = 30000
/** ********* Offset management configuration ***********/
val OffsetMetadataMaxSize = OffsetManagerConfig.DefaultMaxMetadataSize
val OffsetsLoadBufferSize = OffsetManagerConfig.DefaultLoadBufferSize
@ -218,6 +222,9 @@ object KafkaConfig {
val ControlledShutdownMaxRetriesProp = "controlled.shutdown.max.retries"
val ControlledShutdownRetryBackoffMsProp = "controlled.shutdown.retry.backoff.ms"
val ControlledShutdownEnableProp = "controlled.shutdown.enable"
/** ********* Consumer coordinator configuration ***********/
val ConsumerMinSessionTimeoutMsProp = "consumer.min.session.timeout.ms"
val ConsumerMaxSessionTimeoutMsProp = "consumer.max.session.timeout.ms"
/** ********* Offset management configuration ***********/
val OffsetMetadataMaxSizeProp = "offset.metadata.max.bytes"
val OffsetsLoadBufferSizeProp = "offsets.load.buffer.size"
@ -343,6 +350,9 @@ object KafkaConfig {
val ControlledShutdownMaxRetriesDoc = "Controlled shutdown can fail for multiple reasons. This determines the number of retries when such failure happens"
val ControlledShutdownRetryBackoffMsDoc = "Before each retry, the system needs time to recover from the state that caused the previous failure (Controller fail over, replica lag etc). This config determines the amount of time to wait before retrying."
val ControlledShutdownEnableDoc = "Enable controlled shutdown of the server"
/** ********* Consumer coordinator configuration ***********/
val ConsumerMinSessionTimeoutMsDoc = "The minimum allowed session timeout for registered consumers"
val ConsumerMaxSessionTimeoutMsDoc = "The maximum allowed session timeout for registered consumers"
/** ********* Offset management configuration ***********/
val OffsetMetadataMaxSizeDoc = "The maximum size for a metadata entry associated with an offset commit"
val OffsetsLoadBufferSizeDoc = "Batch size for reading from the offsets segments when loading offsets into the cache."
@ -461,11 +471,16 @@ object KafkaConfig {
.define(UncleanLeaderElectionEnableProp, BOOLEAN, Defaults.UncleanLeaderElectionEnable, HIGH, UncleanLeaderElectionEnableDoc)
.define(InterBrokerSecurityProtocolProp, STRING, Defaults.InterBrokerSecurityProtocol, MEDIUM, InterBrokerSecurityProtocolDoc)
.define(InterBrokerProtocolVersionProp, STRING, Defaults.InterBrokerProtocolVersion, MEDIUM, InterBrokerProtocolVersionDoc)
/** ********* Controlled shutdown configuration ***********/
.define(ControlledShutdownMaxRetriesProp, INT, Defaults.ControlledShutdownMaxRetries, MEDIUM, ControlledShutdownMaxRetriesDoc)
.define(ControlledShutdownRetryBackoffMsProp, INT, Defaults.ControlledShutdownRetryBackoffMs, MEDIUM, ControlledShutdownRetryBackoffMsDoc)
.define(ControlledShutdownEnableProp, BOOLEAN, Defaults.ControlledShutdownEnable, MEDIUM, ControlledShutdownEnableDoc)
/** ********* Consumer coordinator configuration ***********/
.define(ConsumerMinSessionTimeoutMsProp, INT, Defaults.ConsumerMinSessionTimeoutMs, MEDIUM, ConsumerMinSessionTimeoutMsDoc)
.define(ConsumerMaxSessionTimeoutMsProp, INT, Defaults.ConsumerMaxSessionTimeoutMs, MEDIUM, ConsumerMaxSessionTimeoutMsDoc)
/** ********* Offset management configuration ***********/
.define(OffsetMetadataMaxSizeProp, INT, Defaults.OffsetMetadataMaxSize, HIGH, OffsetMetadataMaxSizeDoc)
.define(OffsetsLoadBufferSizeProp, INT, Defaults.OffsetsLoadBufferSize, atLeast(1), HIGH, OffsetsLoadBufferSizeDoc)
@ -581,11 +596,16 @@ object KafkaConfig {
uncleanLeaderElectionEnable = parsed.get(UncleanLeaderElectionEnableProp).asInstanceOf[Boolean],
interBrokerSecurityProtocol = SecurityProtocol.valueOf(parsed.get(InterBrokerSecurityProtocolProp).asInstanceOf[String]),
interBrokerProtocolVersion = ApiVersion(parsed.get(InterBrokerProtocolVersionProp).asInstanceOf[String]),
/** ********* Controlled shutdown configuration ***********/
controlledShutdownMaxRetries = parsed.get(ControlledShutdownMaxRetriesProp).asInstanceOf[Int],
controlledShutdownRetryBackoffMs = parsed.get(ControlledShutdownRetryBackoffMsProp).asInstanceOf[Int],
controlledShutdownEnable = parsed.get(ControlledShutdownEnableProp).asInstanceOf[Boolean],
/** ********* Consumer coordinator configuration ***********/
consumerMinSessionTimeoutMs = parsed.get(ConsumerMinSessionTimeoutMsProp).asInstanceOf[Int],
consumerMaxSessionTimeoutMs = parsed.get(ConsumerMaxSessionTimeoutMsProp).asInstanceOf[Int],
/** ********* Offset management configuration ***********/
offsetMetadataMaxSize = parsed.get(OffsetMetadataMaxSizeProp).asInstanceOf[Int],
offsetsLoadBufferSize = parsed.get(OffsetsLoadBufferSizeProp).asInstanceOf[Int],
@ -729,6 +749,10 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/
val controlledShutdownRetryBackoffMs: Int = Defaults.ControlledShutdownRetryBackoffMs,
val controlledShutdownEnable: Boolean = Defaults.ControlledShutdownEnable,
/** ********* Consumer coordinator configuration ***********/
val consumerMinSessionTimeoutMs: Int = Defaults.ConsumerMinSessionTimeoutMs,
val consumerMaxSessionTimeoutMs: Int = Defaults.ConsumerMaxSessionTimeoutMs,
/** ********* Offset management configuration ***********/
val offsetMetadataMaxSize: Int = Defaults.OffsetMetadataMaxSize,
val offsetsLoadBufferSize: Int = Defaults.OffsetsLoadBufferSize,
@ -951,6 +975,10 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/
props.put(ControlledShutdownRetryBackoffMsProp, controlledShutdownRetryBackoffMs.toString)
props.put(ControlledShutdownEnableProp, controlledShutdownEnable.toString)
/** ********* Consumer coordinator configuration ***********/
props.put(ConsumerMinSessionTimeoutMsProp, consumerMinSessionTimeoutMs.toString)
props.put(ConsumerMaxSessionTimeoutMsProp, consumerMaxSessionTimeoutMs.toString)
/** ********* Offset management configuration ***********/
props.put(OffsetMetadataMaxSizeProp, offsetMetadataMaxSize.toString)
props.put(OffsetsLoadBufferSizeProp, offsetsLoadBufferSize.toString)

View File

@ -40,12 +40,14 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
val tp = new TopicPartition(topic, part)
// configure the servers and clients
this.serverConfig.setProperty("controlled.shutdown.enable", "false") // speed up shutdown
this.serverConfig.setProperty("offsets.topic.replication.factor", "3") // don't want to lose offset
this.serverConfig.setProperty("offsets.topic.num.partitions", "1")
this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown
this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset
this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
this.serverConfig.setProperty(KafkaConfig.ConsumerMinSessionTimeoutMsProp, "10") // set small enough session timeout
this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test")
this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString)
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "20")
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
override def generateConfigs() = {
@ -60,7 +62,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
TestUtils.createTopic(this.zkClient, topic, 1, serverCount, this.servers)
}
def testConsumptionWithBrokerFailures() = consumeWithBrokerFailures(5)
def testConsumptionWithBrokerFailures() = consumeWithBrokerFailures(20)
/*
* 1. Produce a bunch of messages

View File

@ -24,8 +24,8 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException
import kafka.utils.{ShutdownableThread, TestUtils, Logging}
import kafka.server.OffsetManager
import kafka.utils.{TestUtils, Logging}
import kafka.server.{KafkaConfig, OffsetManager}
import java.util.ArrayList
import org.junit.Assert._
@ -47,9 +47,10 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
val tp = new TopicPartition(topic, part)
// configure the servers and clients
this.serverConfig.setProperty("controlled.shutdown.enable", "false") // speed up shutdown
this.serverConfig.setProperty("offsets.topic.replication.factor", "3") // don't want to lose offset
this.serverConfig.setProperty("offsets.topic.num.partitions", "1")
this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown
this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset
this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
this.serverConfig.setProperty(KafkaConfig.ConsumerMinSessionTimeoutMsProp, "100") // set small enough session timeout
this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test")
this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString)
@ -146,8 +147,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
assertNull(this.consumers(0).partitionsFor("non-exist-topic"))
}
// TODO: fix test after fixing consumer-side Coordinator logic
def failingTestPartitionReassignmentCallback() {
def testPartitionReassignmentCallback() {
val callback = new TestConsumerReassignmentCallback()
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "200"); // timeout quickly to avoid slow test
val consumer0 = new KafkaConsumer(this.consumerConfig, callback, new ByteArrayDeserializer(), new ByteArrayDeserializer())

View File

@ -133,6 +133,9 @@ class KafkaConfigConfigDefTest extends JUnit3Suite {
Assert.assertEquals(expectedConfig.controlledShutdownRetryBackoffMs, actualConfig.controlledShutdownRetryBackoffMs)
Assert.assertEquals(expectedConfig.controlledShutdownEnable, actualConfig.controlledShutdownEnable)
Assert.assertEquals(expectedConfig.consumerMinSessionTimeoutMs, actualConfig.consumerMinSessionTimeoutMs)
Assert.assertEquals(expectedConfig.consumerMaxSessionTimeoutMs, actualConfig.consumerMaxSessionTimeoutMs)
Assert.assertEquals(expectedConfig.offsetMetadataMaxSize, actualConfig.offsetMetadataMaxSize)
Assert.assertEquals(expectedConfig.offsetsLoadBufferSize, actualConfig.offsetsLoadBufferSize)
Assert.assertEquals(expectedConfig.offsetsTopicReplicationFactor, actualConfig.offsetsTopicReplicationFactor)
@ -330,6 +333,8 @@ class KafkaConfigConfigDefTest extends JUnit3Suite {
case KafkaConfig.ControlledShutdownMaxRetriesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.ControlledShutdownRetryBackoffMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.ControlledShutdownEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0")
case KafkaConfig.ConsumerMinSessionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.ConsumerMaxSessionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.OffsetMetadataMaxSizeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.OffsetsLoadBufferSizeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.OffsetsTopicReplicationFactorProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")