KAFKA-1910 Follow-up again; fix ListOffsetResponse handling for the expected error codes

This commit is contained in:
Guozhang Wang 2015-03-11 16:25:21 -07:00
parent 1eb5f53aa4
commit 01f20e029f
4 changed files with 7 additions and 3 deletions

View File

@ -129,7 +129,7 @@ public final class Coordinator {
// process the response
JoinGroupResponse response = new JoinGroupResponse(resp.responseBody());
// TODO: needs to handle disconnects and errors
// TODO: needs to handle disconnects and errors, should not just throw exceptions
Errors.forCode(response.errorCode()).maybeThrow();
this.consumerId = response.consumerId();

View File

@ -231,11 +231,12 @@ public class Fetcher<K, V> {
log.debug("Fetched offset {} for partition {}", offset, topicPartition);
return offset;
} else if (errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
|| errorCode == Errors.LEADER_NOT_AVAILABLE.code()) {
|| errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
log.warn("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.",
topicPartition);
awaitMetadataUpdate();
} else {
// TODO: we should not just throw exceptions but should handle and log it.
Errors.forCode(errorCode).maybeThrow();
}
}

View File

@ -45,7 +45,9 @@ public class ListOffsetResponse extends AbstractRequestResponse {
/**
* Possible error code:
*
* TODO
* UNKNOWN_TOPIC_OR_PARTITION (3)
* NOT_LEADER_FOR_PARTITION (6)
* UNKNOWN (-1)
*/
private static final String OFFSETS_KEY_NAME = "offsets";

View File

@ -261,6 +261,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
override def doWork(): Unit = {
killRandomBroker()
Thread.sleep(500)
restartDeadBrokers()
iter += 1