From 32bf0774e9727e8f92ce168bc568f62d20968386 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Mon, 18 Nov 2019 17:32:23 -0800 Subject: [PATCH] MINOR: Remove explicit version checks in getErrorResponse methods (#7708) This patch removes the explicit version check pattern we used in `getErrorResponse`, which is a pain to maintain (as seen by KAFKA-9200). We already check that requests have a valid version range in the `AbstractRequest` constructor. Reviewers: Andrew Choi , Ismael Juma --- .../common/requests/AlterConfigsRequest.java | 18 +++------ .../requests/AlterReplicaLogDirsRequest.java | 13 +------ .../common/requests/CreateAclsRequest.java | 16 ++------ .../requests/CreatePartitionsRequest.java | 11 +----- .../common/requests/DeleteAclsRequest.java | 18 +++------ .../common/requests/DeleteRecordsRequest.java | 10 +---- .../common/requests/DescribeAclsRequest.java | 11 +----- .../requests/DescribeConfigsRequest.java | 23 ++++------- .../requests/DescribeLogDirsRequest.java | 11 +----- .../common/requests/JoinGroupRequest.java | 39 ++++--------------- .../common/requests/ListOffsetRequest.java | 13 +------ .../common/requests/MetadataRequest.java | 20 +--------- .../common/requests/OffsetCommitRequest.java | 28 ++----------- .../kafka/common/requests/ProduceRequest.java | 17 +------- .../common/requests/SyncGroupRequest.java | 25 ++---------- .../common/message/DeleteTopicsResponse.json | 2 +- .../common/message/MetadataResponse.json | 2 +- .../requests/OffsetCommitRequestTest.java | 8 +--- 18 files changed, 48 insertions(+), 237 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java index 3fbf13d765c..eecb2f90225 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java @@ -191,19 +191,11 @@ public class AlterConfigsRequest extends AbstractRequest { @Override public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { - short version = version(); - switch (version) { - case 0: - case 1: - ApiError error = ApiError.fromThrowable(e); - Map errors = new HashMap<>(configs.size()); - for (ConfigResource resource : configs.keySet()) - errors.put(resource, error); - return new AlterConfigsResponse(throttleTimeMs, errors); - default: - throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", - version, this.getClass().getSimpleName(), ApiKeys.ALTER_CONFIGS.latestVersion())); - } + ApiError error = ApiError.fromThrowable(e); + Map errors = new HashMap<>(configs.size()); + for (ConfigResource resource : configs.keySet()) + errors.put(resource, error); + return new AlterConfigsResponse(throttleTimeMs, errors); } public static AlterConfigsRequest parse(ByteBuffer buffer, short version) { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsRequest.java index b7cf1d8d40b..888470f2596 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsRequest.java @@ -144,21 +144,10 @@ public class AlterReplicaLogDirsRequest extends AbstractRequest { @Override public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { Map responseMap = new HashMap<>(); - for (Map.Entry entry : partitionDirs.entrySet()) { responseMap.put(entry.getKey(), Errors.forException(e)); } - - short versionId = version(); - switch (versionId) { - case 0: - case 1: - return new AlterReplicaLogDirsResponse(throttleTimeMs, responseMap); - default: - throw new IllegalArgumentException( - String.format("Version %d is not valid. Valid versions for %s are 0 to %d", versionId, - this.getClass().getSimpleName(), ApiKeys.ALTER_REPLICA_LOG_DIRS.latestVersion())); - } + return new AlterReplicaLogDirsResponse(throttleTimeMs, responseMap); } public Map partitionDirs() { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java index 29ecd016f4b..992e3d4c3f1 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java @@ -157,18 +157,10 @@ public class CreateAclsRequest extends AbstractRequest { @Override public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable throwable) { - short versionId = version(); - switch (versionId) { - case 0: - case 1: - List responses = new ArrayList<>(); - for (int i = 0; i < aclCreations.size(); i++) - responses.add(new CreateAclsResponse.AclCreationResponse(ApiError.fromThrowable(throwable))); - return new CreateAclsResponse(throttleTimeMs, responses); - default: - throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", - versionId, this.getClass().getSimpleName(), ApiKeys.CREATE_ACLS.latestVersion())); - } + List responses = new ArrayList<>(); + for (int i = 0; i < aclCreations.size(); i++) + responses.add(new CreateAclsResponse.AclCreationResponse(ApiError.fromThrowable(throwable))); + return new CreateAclsResponse(throttleTimeMs, responses); } public static CreateAclsRequest parse(ByteBuffer buffer, short version) { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java index 7872cf9f8d0..bf269218589 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java @@ -231,16 +231,7 @@ public class CreatePartitionsRequest extends AbstractRequest { for (String topic : newPartitions.keySet()) { topicErrors.put(topic, ApiError.fromThrowable(e)); } - - short versionId = version(); - switch (versionId) { - case 0: - case 1: - return new CreatePartitionsResponse(throttleTimeMs, topicErrors); - default: - throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", - versionId, this.getClass().getSimpleName(), ApiKeys.CREATE_PARTITIONS.latestVersion())); - } + return new CreatePartitionsResponse(throttleTimeMs, topicErrors); } public static CreatePartitionsRequest parse(ByteBuffer buffer, short version) { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java index 2ed7c24f68a..5a20cebf8e0 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java @@ -133,20 +133,12 @@ public class DeleteAclsRequest extends AbstractRequest { @Override public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable throwable) { - short versionId = version(); - switch (versionId) { - case 0: - case 1: - List responses = new ArrayList<>(); - for (int i = 0; i < filters.size(); i++) { - responses.add(new DeleteAclsResponse.AclFilterResponse( - ApiError.fromThrowable(throwable), Collections.emptySet())); - } - return new DeleteAclsResponse(throttleTimeMs, responses); - default: - throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", - versionId, this.getClass().getSimpleName(), ApiKeys.DELETE_ACLS.latestVersion())); + List responses = new ArrayList<>(); + for (int i = 0; i < filters.size(); i++) { + responses.add(new DeleteAclsResponse.AclFilterResponse( + ApiError.fromThrowable(throwable), Collections.emptySet())); } + return new DeleteAclsResponse(throttleTimeMs, responses); } public static DeleteAclsRequest parse(ByteBuffer buffer, short version) { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java index 7ea55534f69..d52bc7d6b32 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java @@ -155,15 +155,7 @@ public class DeleteRecordsRequest extends AbstractRequest { responseMap.put(entry.getKey(), new DeleteRecordsResponse.PartitionResponse(DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.forException(e))); } - short versionId = version(); - switch (versionId) { - case 0: - case 1: - return new DeleteRecordsResponse(throttleTimeMs, responseMap); - default: - throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", - versionId, this.getClass().getSimpleName(), ApiKeys.DELETE_RECORDS.latestVersion())); - } + return new DeleteRecordsResponse(throttleTimeMs, responseMap); } public int timeout() { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java index ed417c57730..6e3f24a1438 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java @@ -109,16 +109,7 @@ public class DescribeAclsRequest extends AbstractRequest { @Override public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable throwable) { - short versionId = version(); - switch (versionId) { - case 0: - case 1: - return new DescribeAclsResponse(throttleTimeMs, ApiError.fromThrowable(throwable), - Collections.emptySet()); - default: - throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", - versionId, this.getClass().getSimpleName(), ApiKeys.DESCRIBE_ACLS.latestVersion())); - } + return new DescribeAclsResponse(throttleTimeMs, ApiError.fromThrowable(throwable), Collections.emptySet()); } public static DescribeAclsRequest parse(ByteBuffer buffer, short version) { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java index 99bbdd772c6..4bcc380473a 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java @@ -164,22 +164,13 @@ public class DescribeConfigsRequest extends AbstractRequest { @Override public DescribeConfigsResponse getErrorResponse(int throttleTimeMs, Throwable e) { - short version = version(); - switch (version) { - case 0: - case 1: - case 2: - ApiError error = ApiError.fromThrowable(e); - Map errors = new HashMap<>(resources().size()); - DescribeConfigsResponse.Config config = new DescribeConfigsResponse.Config(error, - Collections.emptyList()); - for (ConfigResource resource : resources()) - errors.put(resource, config); - return new DescribeConfigsResponse(throttleTimeMs, errors); - default: - throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", - version, this.getClass().getSimpleName(), ApiKeys.DESCRIBE_CONFIGS.latestVersion())); - } + ApiError error = ApiError.fromThrowable(e); + Map errors = new HashMap<>(resources().size()); + DescribeConfigsResponse.Config config = new DescribeConfigsResponse.Config(error, + Collections.emptyList()); + for (ConfigResource resource : resources()) + errors.put(resource, config); + return new DescribeConfigsResponse(throttleTimeMs, errors); } public static DescribeConfigsRequest parse(ByteBuffer buffer, short version) { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java index e16cc187332..84f6c9111a3 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java @@ -139,16 +139,7 @@ public class DescribeLogDirsRequest extends AbstractRequest { @Override public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { - short versionId = version(); - switch (versionId) { - case 0: - case 1: - return new DescribeLogDirsResponse(throttleTimeMs, new HashMap()); - default: - throw new IllegalArgumentException( - String.format("Version %d is not valid. Valid versions for %s are 0 to %d", versionId, - this.getClass().getSimpleName(), ApiKeys.DESCRIBE_LOG_DIRS.latestVersion())); - } + return new DescribeLogDirsResponse(throttleTimeMs, new HashMap()); } public boolean isAllTopicPartitions() { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java index 95d125d312a..1e27a63b64f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java @@ -117,37 +117,14 @@ public class JoinGroupRequest extends AbstractRequest { @Override public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { - short versionId = version(); - switch (versionId) { - case 0: - case 1: - return new JoinGroupResponse( - new JoinGroupResponseData() - .setErrorCode(Errors.forException(e).code()) - .setGenerationId(JoinGroupResponse.UNKNOWN_GENERATION_ID) - .setProtocolName(JoinGroupResponse.UNKNOWN_PROTOCOL) - .setLeader(JoinGroupResponse.UNKNOWN_MEMBER_ID) - .setMemberId(JoinGroupResponse.UNKNOWN_MEMBER_ID) - .setMembers(Collections.emptyList()) - ); - case 2: - case 3: - case 4: - case 5: - return new JoinGroupResponse( - new JoinGroupResponseData() - .setThrottleTimeMs(throttleTimeMs) - .setErrorCode(Errors.forException(e).code()) - .setGenerationId(JoinGroupResponse.UNKNOWN_GENERATION_ID) - .setProtocolName(JoinGroupResponse.UNKNOWN_PROTOCOL) - .setLeader(JoinGroupResponse.UNKNOWN_MEMBER_ID) - .setMemberId(JoinGroupResponse.UNKNOWN_MEMBER_ID) - .setMembers(Collections.emptyList()) - ); - default: - throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", - versionId, this.getClass().getSimpleName(), ApiKeys.JOIN_GROUP.latestVersion())); - } + return new JoinGroupResponse(new JoinGroupResponseData() + .setThrottleTimeMs(throttleTimeMs) + .setErrorCode(Errors.forException(e).code()) + .setGenerationId(JoinGroupResponse.UNKNOWN_GENERATION_ID) + .setProtocolName(JoinGroupResponse.UNKNOWN_PROTOCOL) + .setLeader(JoinGroupResponse.UNKNOWN_MEMBER_ID) + .setMemberId(JoinGroupResponse.UNKNOWN_MEMBER_ID) + .setMembers(Collections.emptyList())); } public static JoinGroupRequest parse(ByteBuffer buffer, short version) { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java index c9464df99f7..17f921f6f32 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java @@ -269,18 +269,7 @@ public class ListOffsetRequest extends AbstractRequest { responseData.put(partition, partitionError); } - switch (versionId) { - case 0: - case 1: - case 2: - case 3: - case 4: - case 5: - return new ListOffsetResponse(throttleTimeMs, responseData); - default: - throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", - versionId, this.getClass().getSimpleName(), ApiKeys.LIST_OFFSETS.latestVersion())); - } + return new ListOffsetResponse(throttleTimeMs, responseData); } public int replicaId() { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java index cbfca4bbdf6..c2533e4f377 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java @@ -133,24 +133,8 @@ public class MetadataRequest extends AbstractRequest { .setPartitions(Collections.emptyList())); } - short versionId = version(); - switch (versionId) { - case 0: - case 1: - case 2: - return new MetadataResponse(responseData); - case 3: - case 4: - case 5: - case 6: - case 7: - case 8: - responseData.setThrottleTimeMs(throttleTimeMs); - return new MetadataResponse(responseData); - default: - throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", - versionId, this.getClass().getSimpleName(), ApiKeys.METADATA.latestVersion())); - } + responseData.setThrottleTimeMs(throttleTimeMs); + return new MetadataResponse(responseData); } public boolean isAllTopics() { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java index 72ebef261f4..bb1b74bf411 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java @@ -124,31 +124,9 @@ public class OffsetCommitRequest extends AbstractRequest { public OffsetCommitResponse getErrorResponse(int throttleTimeMs, Throwable e) { List responseTopicData = getErrorResponseTopics(data.topics(), Errors.forException(e)); - - short versionId = version(); - switch (versionId) { - case 0: - case 1: - case 2: - return new OffsetCommitResponse( - new OffsetCommitResponseData() - .setTopics(responseTopicData) - ); - case 3: - case 4: - case 5: - case 6: - case 7: - return new OffsetCommitResponse( - new OffsetCommitResponseData() - .setTopics(responseTopicData) - .setThrottleTimeMs(throttleTimeMs) - - ); - default: - throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", - versionId, this.getClass().getSimpleName(), ApiKeys.OFFSET_COMMIT.latestVersion())); - } + return new OffsetCommitResponse(new OffsetCommitResponseData() + .setTopics(responseTopicData) + .setThrottleTimeMs(throttleTimeMs)); } public static OffsetCommitRequest parse(ByteBuffer buffer, short version) { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java index 7b3ae1cf2af..485145f3ffd 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java @@ -333,22 +333,7 @@ public class ProduceRequest extends AbstractRequest { for (TopicPartition tp : partitions()) responseMap.put(tp, partitionResponse); - short versionId = version(); - switch (versionId) { - case 0: - case 1: - case 2: - case 3: - case 4: - case 5: - case 6: - case 7: - case 8: - return new ProduceResponse(responseMap, throttleTimeMs); - default: - throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", - versionId, this.getClass().getSimpleName(), ApiKeys.PRODUCE.latestVersion())); - } + return new ProduceResponse(responseMap, throttleTimeMs); } @Override diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java index 48319d55cde..ad5e4468cde 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java @@ -67,27 +67,10 @@ public class SyncGroupRequest extends AbstractRequest { @Override public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { - short versionId = version(); - switch (versionId) { - case 0: - return new SyncGroupResponse( - new SyncGroupResponseData() - .setErrorCode(Errors.forException(e).code()) - .setAssignment(new byte[0]) - ); - case 1: - case 2: - case 3: - return new SyncGroupResponse( - new SyncGroupResponseData() - .setErrorCode(Errors.forException(e).code()) - .setAssignment(new byte[0]) - .setThrottleTimeMs(throttleTimeMs) - ); - default: - throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", - versionId, this.getClass().getSimpleName(), ApiKeys.SYNC_GROUP.latestVersion())); - } + return new SyncGroupResponse(new SyncGroupResponseData() + .setErrorCode(Errors.forException(e).code()) + .setAssignment(new byte[0]) + .setThrottleTimeMs(throttleTimeMs)); } public Map groupAssignments() { diff --git a/clients/src/main/resources/common/message/DeleteTopicsResponse.json b/clients/src/main/resources/common/message/DeleteTopicsResponse.json index 814909a48d2..b94a2bf86ba 100644 --- a/clients/src/main/resources/common/message/DeleteTopicsResponse.json +++ b/clients/src/main/resources/common/message/DeleteTopicsResponse.json @@ -27,7 +27,7 @@ "validVersions": "0-4", "flexibleVersions": "4+", "fields": [ - { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", + { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "Responses", "type": "[]DeletableTopicResult", "versions": "0+", "about": "The results for each topic we tried to delete.", "fields": [ diff --git a/clients/src/main/resources/common/message/MetadataResponse.json b/clients/src/main/resources/common/message/MetadataResponse.json index ce232b12331..19faecff80d 100644 --- a/clients/src/main/resources/common/message/MetadataResponse.json +++ b/clients/src/main/resources/common/message/MetadataResponse.json @@ -39,7 +39,7 @@ "validVersions": "0-9", "flexibleVersions": "9+", "fields": [ - { "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", + { "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "Brokers", "type": "[]MetadataResponseBroker", "versions": "0+", "about": "Each broker in the response.", "fields": [ diff --git a/clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java index c6c5a7bea24..7255b866cc6 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java @@ -34,7 +34,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import static org.apache.kafka.common.requests.AbstractResponse.DEFAULT_THROTTLE_TIME; import static org.apache.kafka.common.requests.OffsetCommitRequest.getErrorResponseTopics; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; @@ -97,12 +96,7 @@ public class OffsetCommitRequestTest { OffsetCommitResponse response = request.getErrorResponse(throttleTimeMs, Errors.NOT_COORDINATOR.exception()); assertEquals(Collections.singletonMap(Errors.NOT_COORDINATOR, 2), response.errorCounts()); - - if (version >= 3) { - assertEquals(throttleTimeMs, response.throttleTimeMs()); - } else { - assertEquals(DEFAULT_THROTTLE_TIME, response.throttleTimeMs()); - } + assertEquals(throttleTimeMs, response.throttleTimeMs()); } }