mirror of https://github.com/apache/kafka.git
KAFKA-17372 Move `ThrottledChannelExpirationTest#testThrottledChannelDelay` and `ThrottledChannel` to server module (#16935)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
a2f89f5412
commit
79f979cd52
|
@ -29,7 +29,7 @@ import org.apache.kafka.common.metrics.stats.{Avg, CumulativeSum, Rate}
|
|||
import org.apache.kafka.common.security.auth.KafkaPrincipal
|
||||
import org.apache.kafka.common.utils.{Sanitizer, Time}
|
||||
import org.apache.kafka.server.config.{ClientQuotaManagerConfig, ZooKeeperInternals}
|
||||
import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaEntity, ClientQuotaType}
|
||||
import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaEntity, ClientQuotaType, ThrottleCallback, ThrottledChannel}
|
||||
import org.apache.kafka.server.util.ShutdownableThread
|
||||
import org.apache.kafka.network.Session
|
||||
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.kafka.common.network.Send
|
|||
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse}
|
||||
import org.apache.kafka.common.utils.Time
|
||||
import org.apache.kafka.coordinator.group.GroupCoordinator
|
||||
import org.apache.kafka.server.quota.ThrottleCallback
|
||||
|
||||
import java.util.OptionalInt
|
||||
|
||||
|
|
|
@ -1,61 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package kafka.server
|
||||
|
||||
import java.util.concurrent.{Delayed, TimeUnit}
|
||||
|
||||
import kafka.utils.Logging
|
||||
import org.apache.kafka.common.utils.Time
|
||||
|
||||
trait ThrottleCallback {
|
||||
def startThrottling(): Unit
|
||||
def endThrottling(): Unit
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents a request whose response has been delayed.
|
||||
* @param time Time instance to use
|
||||
* @param throttleTimeMs Delay associated with this request
|
||||
* @param callback Callback for channel throttling
|
||||
*/
|
||||
class ThrottledChannel(
|
||||
val time: Time,
|
||||
val throttleTimeMs: Int,
|
||||
val callback: ThrottleCallback
|
||||
) extends Delayed with Logging {
|
||||
|
||||
private val endTimeNanos = time.nanoseconds() + TimeUnit.MILLISECONDS.toNanos(throttleTimeMs)
|
||||
|
||||
// Notify the socket server that throttling has started for this channel.
|
||||
callback.startThrottling()
|
||||
|
||||
// Notify the socket server that throttling has been done for this channel.
|
||||
def notifyThrottlingDone(): Unit = {
|
||||
trace(s"Channel throttled for: $throttleTimeMs ms")
|
||||
callback.endThrottling()
|
||||
}
|
||||
|
||||
override def getDelay(unit: TimeUnit): Long = {
|
||||
unit.convert(endTimeNanos - time.nanoseconds(), TimeUnit.NANOSECONDS)
|
||||
}
|
||||
|
||||
override def compareTo(d: Delayed): Int = {
|
||||
val other = d.asInstanceOf[ThrottledChannel]
|
||||
java.lang.Long.compare(this.endTimeNanos, other.endTimeNanos)
|
||||
}
|
||||
}
|
|
@ -40,6 +40,7 @@ import org.apache.kafka.security.CredentialProvider
|
|||
import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion}
|
||||
import org.apache.kafka.server.config.QuotaConfigs
|
||||
import org.apache.kafka.server.metrics.KafkaYammerMetrics
|
||||
import org.apache.kafka.server.quota.{ThrottleCallback, ThrottledChannel}
|
||||
import org.apache.kafka.test.{TestSslUtils, TestUtils => JTestUtils}
|
||||
import org.apache.log4j.Level
|
||||
import org.junit.jupiter.api.Assertions._
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.kafka.common.requests.{AbstractRequest, FetchRequest, RequestC
|
|||
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
|
||||
import org.apache.kafka.common.utils.MockTime
|
||||
import org.apache.kafka.network.Session
|
||||
import org.apache.kafka.server.quota.ThrottleCallback
|
||||
import org.junit.jupiter.api.AfterEach
|
||||
import org.mockito.Mockito.mock
|
||||
|
||||
|
|
|
@ -87,6 +87,7 @@ import org.apache.kafka.server.common.{FinalizedFeatures, KRaftVersion, Metadata
|
|||
import org.apache.kafka.server.config.{ConfigType, KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ShareGroupConfig}
|
||||
import org.apache.kafka.server.metrics.ClientMetricsTestUtils
|
||||
import org.apache.kafka.server.share.{CachedSharePartition, ErroneousAndValidPartitionData, FinalContext, ShareAcknowledgementBatch, ShareSession, ShareSessionContext, ShareSessionKey}
|
||||
import org.apache.kafka.server.quota.ThrottleCallback
|
||||
import org.apache.kafka.server.util.{FutureUtils, MockTime}
|
||||
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchParams, FetchPartitionData, LogConfig}
|
||||
import org.junit.jupiter.api.Assertions._
|
||||
|
|
|
@ -19,10 +19,11 @@ package kafka.server
|
|||
|
||||
|
||||
import java.util.Collections
|
||||
import java.util.concurrent.{DelayQueue, TimeUnit}
|
||||
import java.util.concurrent.DelayQueue
|
||||
import org.apache.kafka.common.metrics.MetricConfig
|
||||
import org.apache.kafka.common.utils.MockTime
|
||||
import org.apache.kafka.server.config.ClientQuotaManagerConfig
|
||||
import org.apache.kafka.server.quota.{ThrottleCallback, ThrottledChannel}
|
||||
import org.junit.jupiter.api.Assertions._
|
||||
import org.junit.jupiter.api.{BeforeEach, Test}
|
||||
|
||||
|
@ -81,22 +82,4 @@ class ThrottledChannelExpirationTest {
|
|||
clientMetrics.shutdown()
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
def testThrottledChannelDelay(): Unit = {
|
||||
val t1: ThrottledChannel = new ThrottledChannel(time, 10, callback)
|
||||
val t2: ThrottledChannel = new ThrottledChannel(time, 20, callback)
|
||||
val t3: ThrottledChannel = new ThrottledChannel(time, 20, callback)
|
||||
assertEquals(10, t1.throttleTimeMs)
|
||||
assertEquals(20, t2.throttleTimeMs)
|
||||
assertEquals(20, t3.throttleTimeMs)
|
||||
|
||||
for (itr <- 0 to 2) {
|
||||
assertEquals(10 - 10*itr, t1.getDelay(TimeUnit.MILLISECONDS))
|
||||
assertEquals(20 - 10*itr, t2.getDelay(TimeUnit.MILLISECONDS))
|
||||
assertEquals(20 - 10*itr, t3.getDelay(TimeUnit.MILLISECONDS))
|
||||
time.sleep(10)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,22 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.server.quota;
|
||||
|
||||
public interface ThrottleCallback {
|
||||
void startThrottling();
|
||||
void endThrottling();
|
||||
}
|
|
@ -0,0 +1,72 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.server.quota;
|
||||
|
||||
import org.apache.kafka.common.utils.Time;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.concurrent.Delayed;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class ThrottledChannel implements Delayed {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(ThrottledChannel.class);
|
||||
private final Time time;
|
||||
private final int throttleTimeMs;
|
||||
private final ThrottleCallback callback;
|
||||
private final long endTimeNanos;
|
||||
|
||||
/**
|
||||
* Represents a request whose response has been delayed.
|
||||
* @param time Time instance to use
|
||||
* @param throttleTimeMs Delay associated with this request
|
||||
* @param callback Callback for channel throttling
|
||||
*/
|
||||
public ThrottledChannel(Time time, int throttleTimeMs, ThrottleCallback callback) {
|
||||
this.time = time;
|
||||
this.throttleTimeMs = throttleTimeMs;
|
||||
this.callback = callback;
|
||||
this.endTimeNanos = time.nanoseconds() + TimeUnit.MILLISECONDS.toNanos(throttleTimeMs);
|
||||
|
||||
// Notify the socket server that throttling has started for this channel.
|
||||
callback.startThrottling();
|
||||
}
|
||||
|
||||
/**
|
||||
* Notify the socket server that throttling has been done for this channel.
|
||||
*/
|
||||
public void notifyThrottlingDone() {
|
||||
LOGGER.trace("Channel throttled for: {} ms", throttleTimeMs);
|
||||
callback.endThrottling();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getDelay(TimeUnit unit) {
|
||||
return unit.convert(endTimeNanos - time.nanoseconds(), TimeUnit.NANOSECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(Delayed other) {
|
||||
ThrottledChannel otherChannel = (ThrottledChannel) other;
|
||||
return Long.compare(this.endTimeNanos, otherChannel.endTimeNanos);
|
||||
}
|
||||
|
||||
public int throttleTimeMs() {
|
||||
return throttleTimeMs;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,56 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.server.quota;
|
||||
|
||||
import org.apache.kafka.common.utils.MockTime;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
public class ThrottledChannelTest {
|
||||
|
||||
private final MockTime time = new MockTime();
|
||||
private final ThrottleCallback callback = new ThrottleCallback() {
|
||||
@Override
|
||||
public void startThrottling() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void endThrottling() {
|
||||
}
|
||||
};
|
||||
|
||||
@Test
|
||||
public void testThrottledChannelDelay() {
|
||||
ThrottledChannel channel1 = new ThrottledChannel(time, 10, callback);
|
||||
ThrottledChannel channel2 = new ThrottledChannel(time, 20, callback);
|
||||
ThrottledChannel channel3 = new ThrottledChannel(time, 20, callback);
|
||||
assertEquals(10, channel1.throttleTimeMs());
|
||||
assertEquals(20, channel2.throttleTimeMs());
|
||||
assertEquals(20, channel3.throttleTimeMs());
|
||||
|
||||
for (int i = 0; i <= 2; i++) {
|
||||
assertEquals(10 - 10 * i, channel1.getDelay(TimeUnit.MILLISECONDS));
|
||||
assertEquals(20 - 10 * i, channel2.getDelay(TimeUnit.MILLISECONDS));
|
||||
assertEquals(20 - 10 * i, channel3.getDelay(TimeUnit.MILLISECONDS));
|
||||
time.sleep(10);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue