KAFKA-1910 Follow-up; Revert the no-offset-committed error code; reviewed by Joel Koshy

This commit is contained in:
Guozhang Wang 2015-04-07 15:38:36 -07:00
parent 79f7cca85e
commit 013cda2d79
6 changed files with 6 additions and 40 deletions

View File

@ -234,16 +234,14 @@ public final class Coordinator {
coordinatorDead();
offsetsReady = false;
Utils.sleep(this.retryBackoffMs);
} else if (data.errorCode == Errors.NO_OFFSETS_FETCHABLE.code()
|| data.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
} else if (data.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
// just ignore this partition
log.debug("No committed offset for partition " + tp);
log.debug("Unknown topic or partition for " + tp);
} else {
throw new IllegalStateException("Unexpected error code " + data.errorCode + " while fetching offset");
}
} else if (data.offset >= 0) {
// record the position with the offset (-1 seems to indicate no
// such offset known)
// record the position with the offset (-1 indicates no committed offset to fetch)
offsets.put(tp, data.offset);
} else {
log.debug("No committed offset for partition " + tp);

View File

@ -69,9 +69,7 @@ public enum Errors {
INVALID_REQUIRED_ACKS(21,
new InvalidRequiredAcksException("Produce request specified an invalid value for required acks.")),
ILLEGAL_GENERATION(22,
new ApiException("Specified consumer generation id is not valid.")),
NO_OFFSETS_FETCHABLE(23,
new ApiException("No offsets have been committed so far."));
new ApiException("Specified consumer generation id is not valid."));
private static Map<Class<?>, Errors> classToError = new HashMap<Class<?>, Errors>();
private static Map<Short, Errors> codeToError = new HashMap<Short, Errors>();

View File

@ -324,7 +324,6 @@ public final class Record {
checksum(),
key() == null ? 0 : key().limit(),
value() == null ? 0 : value().limit());
}
public boolean equals(Object other) {

View File

@ -244,7 +244,7 @@ public class CoordinatorTest {
assertEquals(100L, (long) coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds()).get(tp));
// fetch with no fetchable offsets
client.prepareResponse(offsetFetchResponse(tp, Errors.NO_OFFSETS_FETCHABLE.code(), "", 100L));
client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", -1L));
assertEquals(0, coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds()).size());
// fetch with offset topic unknown

View File

@ -51,7 +51,6 @@ object ErrorMapping {
val NotEnoughReplicasAfterAppendCode: Short = 20
// 21: InvalidRequiredAcks
// 22: IllegalConsumerGeneration
val NoOffsetsCommittedCode: Short = 23
private val exceptionToCode =
Map[Class[Throwable], Short](
@ -73,8 +72,7 @@ object ErrorMapping {
classOf[InvalidTopicException].asInstanceOf[Class[Throwable]] -> InvalidTopicCode,
classOf[MessageSetSizeTooLargeException].asInstanceOf[Class[Throwable]] -> MessageSetSizeTooLargeCode,
classOf[NotEnoughReplicasException].asInstanceOf[Class[Throwable]] -> NotEnoughReplicasCode,
classOf[NotEnoughReplicasAfterAppendException].asInstanceOf[Class[Throwable]] -> NotEnoughReplicasAfterAppendCode,
classOf[NoOffsetsCommittedException].asInstanceOf[Class[Throwable]] -> NoOffsetsCommittedCode
classOf[NotEnoughReplicasAfterAppendException].asInstanceOf[Class[Throwable]] -> NotEnoughReplicasAfterAppendCode
).withDefaultValue(UnknownCode)
/* invert the mapping */

View File

@ -1,27 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.common
/**
* Number of insync replicas for the partition is lower than min.insync.replicas
* This exception is raised when the low ISR size is discovered *after* the message
* was already appended to the log. Producer retries will cause duplicates.
*/
class NoOffsetsCommittedException(message: String) extends RuntimeException(message) {
def this() = this(null)
}