MINOR: InFlightRequests#isEmpty(node) method corrected.

- In clearAll method, get operation is removed.
- variable name `requestTimeout` changed to `requestTimeoutMs` for clarity

Author: Kamal C <kamal.chandraprakash@gmail.com>

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>

Closes #3467 from Kamal15/frequest
This commit is contained in:
Kamal C 2017-06-30 15:07:00 -04:00 committed by Rajini Sivaram
parent 6471822079
commit 342f34a199
2 changed files with 7 additions and 12 deletions

View File

@ -110,7 +110,7 @@ final class InFlightRequests {
*/ */
public boolean isEmpty(String node) { public boolean isEmpty(String node) {
Deque<NetworkClient.InFlightRequest> queue = requests.get(node); Deque<NetworkClient.InFlightRequest> queue = requests.get(node);
return queue != null && !queue.isEmpty(); return queue == null || queue.isEmpty();
} }
/** /**
@ -141,22 +141,18 @@ final class InFlightRequests {
* @return All the in-flight requests for that node that have been removed * @return All the in-flight requests for that node that have been removed
*/ */
public Iterable<NetworkClient.InFlightRequest> clearAll(String node) { public Iterable<NetworkClient.InFlightRequest> clearAll(String node) {
Deque<NetworkClient.InFlightRequest> reqs = requests.get(node); Deque<NetworkClient.InFlightRequest> reqs = requests.remove(node);
if (reqs == null) { return (reqs == null) ? Collections.<NetworkClient.InFlightRequest>emptyList() : reqs;
return Collections.emptyList();
} else {
return requests.remove(node);
}
} }
/** /**
* Returns a list of nodes with pending in-flight request, that need to be timed out * Returns a list of nodes with pending in-flight request, that need to be timed out
* *
* @param now current time in milliseconds * @param now current time in milliseconds
* @param requestTimeout max time to wait for the request to be completed * @param requestTimeoutMs max time to wait for the request to be completed
* @return list of nodes * @return list of nodes
*/ */
public List<String> getNodesWithTimedOutRequests(long now, int requestTimeout) { public List<String> getNodesWithTimedOutRequests(long now, int requestTimeoutMs) {
List<String> nodeIds = new LinkedList<>(); List<String> nodeIds = new LinkedList<>();
for (Map.Entry<String, Deque<NetworkClient.InFlightRequest>> requestEntry : requests.entrySet()) { for (Map.Entry<String, Deque<NetworkClient.InFlightRequest>> requestEntry : requests.entrySet()) {
String nodeId = requestEntry.getKey(); String nodeId = requestEntry.getKey();
@ -165,11 +161,10 @@ final class InFlightRequests {
if (!deque.isEmpty()) { if (!deque.isEmpty()) {
NetworkClient.InFlightRequest request = deque.peekLast(); NetworkClient.InFlightRequest request = deque.peekLast();
long timeSinceSend = now - request.sendTimeMs; long timeSinceSend = now - request.sendTimeMs;
if (timeSinceSend > requestTimeout) if (timeSinceSend > requestTimeoutMs)
nodeIds.add(nodeId); nodeIds.add(nodeId);
} }
} }
return nodeIds; return nodeIds;
} }

View File

@ -482,7 +482,7 @@ public class NetworkClient implements KafkaClient {
@Override @Override
public boolean hasInFlightRequests(String node) { public boolean hasInFlightRequests(String node) {
return this.inFlightRequests.isEmpty(node); return !this.inFlightRequests.isEmpty(node);
} }
@Override @Override