KAFKA-5587; Remove channel only after staged receives are delivered

When idle connections are closed, ensure that channels with staged
receives are retained in `closingChannels` until all staged receives
are completed. Also ensure that only one staged receive is completed
in each poll, even when channels are closed.

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #3526 from rajinisivaram/KAFKA-5587
This commit is contained in:
Rajini Sivaram 2017-07-17 20:29:26 +01:00 committed by Ismael Juma
parent e2fe19d22a
commit 28c83d9667
5 changed files with 86 additions and 7 deletions

View File

@ -0,0 +1,21 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
log4j.rootLogger=OFF, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
log4j.logger.org.apache.kafka=ERROR

View File

@ -327,14 +327,16 @@ public class Selector implements Selectable, AutoCloseable {
pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
}
addToCompletedReceives();
long endIo = time.nanoseconds();
this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
// we use the time at the end of select to ensure that we don't close any connections that
// have just been processed in pollSelectionKeys
maybeCloseOldestConnection(endSelect);
// Add to completedReceives after closing expired connections to avoid removing
// channels with completed receives until all staged receives are completed.
addToCompletedReceives();
}
private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
@ -563,11 +565,7 @@ public class Selector implements Selectable, AutoCloseable {
// are tracked to ensure that requests are processed one-by-one by the broker to preserve ordering.
Deque<NetworkReceive> deque = this.stagedReceives.get(channel);
if (processOutstanding && deque != null && !deque.isEmpty()) {
if (!channel.isMute()) {
addToCompletedReceives(channel, deque);
if (deque.isEmpty())
this.stagedReceives.remove(channel);
}
// stagedReceives will be moved to completedReceives later along with receives from other channels
closingChannels.put(channel.id(), channel);
} else
doClose(channel, processOutstanding);
@ -697,6 +695,12 @@ public class Selector implements Selectable, AutoCloseable {
return new HashSet<>(nioSelector.keys());
}
// only for testing
int numStagedReceives(KafkaChannel channel) {
Deque<NetworkReceive> deque = stagedReceives.get(channel);
return deque == null ? 0 : deque.size();
}
private class SelectorMetrics {
private final Metrics metrics;
private final String metricGrpPrefix;

View File

@ -17,6 +17,9 @@
package org.apache.kafka.common.network;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.HashMap;
@ -268,6 +271,57 @@ public class SelectorTest {
assertEquals(ChannelState.EXPIRED, selector.disconnected().get(id));
}
@Test
public void testCloseOldestConnectionWithOneStagedReceive() throws Exception {
verifyCloseOldestConnectionWithStagedReceives(1);
}
@Test
public void testCloseOldestConnectionWithMultipleStagedReceives() throws Exception {
verifyCloseOldestConnectionWithStagedReceives(5);
}
private void verifyCloseOldestConnectionWithStagedReceives(int maxStagedReceives) throws Exception {
String id = "0";
blockingConnect(id);
KafkaChannel channel = selector.channel(id);
selector.mute(id);
for (int i = 0; i <= maxStagedReceives; i++) {
selector.send(createSend(id, String.valueOf(i)));
selector.poll(1000);
}
selector.unmute(id);
do {
selector.poll(1000);
} while (selector.completedReceives().isEmpty());
int stagedReceives = selector.numStagedReceives(channel);
int completedReceives = 0;
while (selector.disconnected().isEmpty()) {
time.sleep(6000); // The max idle time is 5000ms
selector.poll(0);
completedReceives += selector.completedReceives().size();
// With SSL, more receives may be staged from buffered data
int newStaged = selector.numStagedReceives(channel) - (stagedReceives - completedReceives);
if (newStaged > 0) {
stagedReceives += newStaged;
assertNotNull("Channel should not have been expired", selector.channel(id));
assertFalse("Channel should not have been disconnected", selector.disconnected().containsKey(id));
} else if (!selector.completedReceives().isEmpty()) {
assertEquals(1, selector.completedReceives().size());
assertTrue("Channel not found", selector.closingChannel(id) != null || selector.channel(id) != null);
assertFalse("Disconnect notified too early", selector.disconnected().containsKey(id));
}
}
assertEquals(maxStagedReceives, completedReceives);
assertEquals(stagedReceives, completedReceives);
assertNull("Channel not removed", selector.channel(id));
assertNull("Channel not removed", selector.closingChannel(id));
assertTrue("Disconnect not notified", selector.disconnected().containsKey(id));
assertTrue("Unexpected receive", selector.completedReceives().isEmpty());
}
private String blockingRequest(String node, String s) throws IOException {
selector.send(createSend(node, s));