Reduce the main thread blocking in clients cron (#13900)
CI / test-ubuntu-latest (push) Waiting to run Details
CI / test-sanitizer-address (push) Waiting to run Details
CI / build-debian-old (push) Waiting to run Details
CI / build-macos-latest (push) Waiting to run Details
CI / build-32bit (push) Waiting to run Details
CI / build-libc-malloc (push) Waiting to run Details
CI / build-centos-jemalloc (push) Waiting to run Details
CI / build-old-chain-jemalloc (push) Waiting to run Details
Codecov / code-coverage (push) Waiting to run Details
External Server Tests / test-external-standalone (push) Waiting to run Details
External Server Tests / test-external-cluster (push) Waiting to run Details
External Server Tests / test-external-nodebug (push) Waiting to run Details
Spellcheck / Spellcheck (push) Waiting to run Details

The main thread needs to check clients in every cron iteration. During
this check, the corresponding I/O threads must not operate on these
clients to avoid data-race. As a result, the main thread is blocked
until the I/O threads finish processing and are suspended, allowing the
main thread to proceed with client checks.

Since the main thread's resources are more valuable than those of I/O
threads in Redis, this blocking behavior should be avoided. To address
this, the I/O threads check during their cron whether any of their
maintained clients need to be inspected by the main thread. If so, the
I/O threads send those clients to the main thread for processing, then
the main thread runs cron jobs for these clients.

In addition, an always-active client might not be in thread->clients, so
before processing the client’s command, we also check whether the client
has skipped running its cron job for over 1 second. If it has, we run
the cron job for the client.

The main thread does not need to actively pause the IO threads, thus
avoiding potential blocking behavior, fixes
https://github.com/redis/redis/issues/13885

Besides, this approach also can let all clients run cron task in a
second, but before, we pause IO threads in multiple batches when there
are more than 8 IO threads, that may cause some clients are not be
processed in a second.

---------

Co-authored-by: Yuan Wang <yuan.wang@redis.com>
This commit is contained in:
wclmxxs 2025-06-30 09:37:17 +08:00 committed by GitHub
parent 531b82df22
commit ca6145b18c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 183 additions and 76 deletions

View File

@ -452,8 +452,17 @@ int processClientsFromIOThread(IOThread *t) {
continue; continue;
} }
/* Update the client in the mem usage */ /* Run cron task for the client per second or it is marked as pending cron. */
updateClientMemUsageAndBucket(c); if (c->last_cron_check_time + 1000 <= server.mstime ||
c->io_flags & CLIENT_IO_PENDING_CRON)
{
c->last_cron_check_time = server.mstime;
if (clientsCronRunClient(c)) continue;
} else {
/* Update the client in the mem usage if clientsCronRunClient is not
* being called, since that function already performs the update. */
updateClientMemUsageAndBucket(c);
}
/* Process the pending command and input buffer. */ /* Process the pending command and input buffer. */
if (!c->read_error && c->io_flags & CLIENT_IO_PENDING_COMMAND) { if (!c->read_error && c->io_flags & CLIENT_IO_PENDING_COMMAND) {
@ -594,7 +603,7 @@ int processClientsFromMainThread(IOThread *t) {
/* Enable read and write and reset some flags. */ /* Enable read and write and reset some flags. */
c->io_flags |= CLIENT_IO_READ_ENABLED | CLIENT_IO_WRITE_ENABLED; c->io_flags |= CLIENT_IO_READ_ENABLED | CLIENT_IO_WRITE_ENABLED;
c->io_flags &= ~CLIENT_IO_PENDING_COMMAND; c->io_flags &= ~(CLIENT_IO_PENDING_COMMAND | CLIENT_IO_PENDING_CRON);
/* Only bind once, we never remove read handler unless freeing client. */ /* Only bind once, we never remove read handler unless freeing client. */
if (!connHasEventLoop(c->conn)) { if (!connHasEventLoop(c->conn)) {
@ -658,6 +667,41 @@ void IOThreadAfterSleep(struct aeEventLoop *el) {
atomicSetWithSync(t->running, 1); atomicSetWithSync(t->running, 1);
} }
/* Periodically transfer part of clients to the main thread for processing. */
void IOThreadClientsCron(IOThread *t) {
/* Process at least a few clients while we are at it, even if we need
* to process less than CLIENTS_CRON_MIN_ITERATIONS to meet our contract
* of processing each client once per second. */
int iterations = listLength(t->clients) / CONFIG_DEFAULT_HZ;
if (iterations < CLIENTS_CRON_MIN_ITERATIONS) {
iterations = CLIENTS_CRON_MIN_ITERATIONS;
}
listIter li;
listNode *ln;
listRewind(t->clients, &li);
while ((ln = listNext(&li)) && iterations--) {
client *c = listNodeValue(ln);
/* Mark the client as pending cron, main thread will process it. */
c->io_flags |= CLIENT_IO_PENDING_CRON;
enqueuePendingClientsToMainThread(c, 0);
}
}
/* This is the IO thread timer interrupt, CONFIG_DEFAULT_HZ times per second.
* The current responsibility is to detect clients that have been stuck in the
* IO thread for too long and hand them over to the main thread for handling. */
int IOThreadCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
UNUSED(eventLoop);
UNUSED(id);
IOThread *t = clientData;
/* Run cron tasks for the clients in the IO thread. */
IOThreadClientsCron(t);
return 1000/CONFIG_DEFAULT_HZ;
}
/* The main function of IO thread, it will run an event loop. The mian thread /* The main function of IO thread, it will run an event loop. The mian thread
* and IO thread will communicate through event notifier. */ * and IO thread will communicate through event notifier. */
void *IOThreadMain(void *ptr) { void *IOThreadMain(void *ptr) {
@ -716,6 +760,13 @@ void initThreadedIO(void) {
exit(1); exit(1);
} }
/* This is the timer callback of the IO thread, used to gradually handle
* some background operations, such as clients cron. */
if (aeCreateTimeEvent(t->el, 1, IOThreadCron, t, NULL) == AE_ERR) {
serverLog(LL_WARNING, "Fatal: Can't create event loop timers in IO thread.");
exit(1);
}
/* Create IO thread */ /* Create IO thread */
if (pthread_create(&t->tid, NULL, IOThreadMain, (void*)t) != 0) { if (pthread_create(&t->tid, NULL, IOThreadMain, (void*)t) != 0) {
serverLog(LL_WARNING, "Fatal: Can't initialize IO thread."); serverLog(LL_WARNING, "Fatal: Can't initialize IO thread.");

View File

@ -212,6 +212,7 @@ client *createClient(connection *conn) {
c->postponed_list_node = NULL; c->postponed_list_node = NULL;
c->client_tracking_redirection = 0; c->client_tracking_redirection = 0;
c->client_tracking_prefixes = NULL; c->client_tracking_prefixes = NULL;
c->last_cron_check_time = 0;
c->last_memory_usage = 0; c->last_memory_usage = 0;
c->last_memory_type = CLIENT_TYPE_NORMAL; c->last_memory_type = CLIENT_TYPE_NORMAL;
c->module_blocked_client = NULL; c->module_blocked_client = NULL;

View File

@ -952,16 +952,19 @@ int clientsCronResizeOutputBuffer(client *c, mstime_t now_ms) {
#define CLIENTS_PEAK_MEM_USAGE_SLOTS 8 #define CLIENTS_PEAK_MEM_USAGE_SLOTS 8
size_t ClientsPeakMemInput[CLIENTS_PEAK_MEM_USAGE_SLOTS] = {0}; size_t ClientsPeakMemInput[CLIENTS_PEAK_MEM_USAGE_SLOTS] = {0};
size_t ClientsPeakMemOutput[CLIENTS_PEAK_MEM_USAGE_SLOTS] = {0}; size_t ClientsPeakMemOutput[CLIENTS_PEAK_MEM_USAGE_SLOTS] = {0};
int CurrentPeakMemUsageSlot = 0;
int clientsCronTrackExpansiveClients(client *c, int time_idx) { int clientsCronTrackExpansiveClients(client *c) {
size_t qb_size = c->querybuf ? sdsZmallocSize(c->querybuf) : 0; size_t qb_size = c->querybuf ? sdsZmallocSize(c->querybuf) : 0;
size_t argv_size = c->argv ? zmalloc_size(c->argv) : 0; size_t argv_size = c->argv ? zmalloc_size(c->argv) : 0;
size_t in_usage = qb_size + c->argv_len_sum + argv_size; size_t in_usage = qb_size + c->argv_len_sum + argv_size;
size_t out_usage = getClientOutputBufferMemoryUsage(c); size_t out_usage = getClientOutputBufferMemoryUsage(c);
/* Track the biggest values observed so far in this slot. */ /* Track the biggest values observed so far in this slot. */
if (in_usage > ClientsPeakMemInput[time_idx]) ClientsPeakMemInput[time_idx] = in_usage; if (in_usage > ClientsPeakMemInput[CurrentPeakMemUsageSlot])
if (out_usage > ClientsPeakMemOutput[time_idx]) ClientsPeakMemOutput[time_idx] = out_usage; ClientsPeakMemInput[CurrentPeakMemUsageSlot] = in_usage;
if (out_usage > ClientsPeakMemOutput[CurrentPeakMemUsageSlot])
ClientsPeakMemOutput[CurrentPeakMemUsageSlot] = out_usage;
return 0; /* This function never terminates the client. */ return 0; /* This function never terminates the client. */
} }
@ -1088,6 +1091,33 @@ void getExpansiveClientsInfo(size_t *in_usage, size_t *out_usage) {
*out_usage = o; *out_usage = o;
} }
/* Run cron tasks for a single client. Return 1 if the client should
* be terminated, 0 otherwise. */
int clientsCronRunClient(client *c) {
mstime_t now = server.mstime;
/* The following functions do different service checks on the client.
* The protocol is that they return non-zero if the client was
* terminated. */
if (clientsCronHandleTimeout(c,now)) return 1;
if (clientsCronResizeQueryBuffer(c)) return 1;
if (clientsCronFreeArgvIfIdle(c)) return 1;
if (clientsCronResizeOutputBuffer(c,now)) return 1;
if (clientsCronTrackExpansiveClients(c)) return 1;
/* Iterating all the clients in getMemoryOverheadData() is too slow and
* in turn would make the INFO command too slow. So we perform this
* computation incrementally and track the (not instantaneous but updated
* to the second) total memory used by clients using clientsCron() in
* a more incremental way (depending on server.hz).
* If client eviction is enabled, update the bucket as well. */
if (!updateClientMemUsageAndBucket(c))
updateClientMemoryUsage(c);
if (closeClientOnOutputBufferLimitReached(c, 0)) return 1;
return 0;
}
/* This function is called by serverCron() and is used in order to perform /* This function is called by serverCron() and is used in order to perform
* operations on clients that are important to perform constantly. For instance * operations on clients that are important to perform constantly. For instance
* we use this function in order to disconnect clients after a timeout, including * we use this function in order to disconnect clients after a timeout, including
@ -1103,8 +1133,6 @@ void getExpansiveClientsInfo(size_t *in_usage, size_t *out_usage) {
* default server.hz value is 10, so sometimes here we need to process thousands * default server.hz value is 10, so sometimes here we need to process thousands
* of clients per second, turning this function into a source of latency. * of clients per second, turning this function into a source of latency.
*/ */
#define CLIENTS_CRON_PAUSE_IOTHREAD 8
#define CLIENTS_CRON_MIN_ITERATIONS 5
void clientsCron(void) { void clientsCron(void) {
/* Try to process at least numclients/server.hz of clients /* Try to process at least numclients/server.hz of clients
* per call. Since normally (if there are no big latency events) this * per call. Since normally (if there are no big latency events) this
@ -1112,7 +1140,6 @@ void clientsCron(void) {
* process all the clients in 1 second. */ * process all the clients in 1 second. */
int numclients = listLength(server.clients); int numclients = listLength(server.clients);
int iterations = numclients/server.hz; int iterations = numclients/server.hz;
mstime_t now = mstime();
/* Process at least a few clients while we are at it, even if we need /* Process at least a few clients while we are at it, even if we need
* to process less than CLIENTS_CRON_MIN_ITERATIONS to meet our contract * to process less than CLIENTS_CRON_MIN_ITERATIONS to meet our contract
@ -1122,7 +1149,7 @@ void clientsCron(void) {
numclients : CLIENTS_CRON_MIN_ITERATIONS; numclients : CLIENTS_CRON_MIN_ITERATIONS;
int curr_peak_mem_usage_slot = server.unixtime % CLIENTS_PEAK_MEM_USAGE_SLOTS; CurrentPeakMemUsageSlot = server.unixtime % CLIENTS_PEAK_MEM_USAGE_SLOTS;
/* Always zero the next sample, so that when we switch to that second, we'll /* Always zero the next sample, so that when we switch to that second, we'll
* only register samples that are greater in that second without considering * only register samples that are greater in that second without considering
* the history of such slot. * the history of such slot.
@ -1134,20 +1161,10 @@ void clientsCron(void) {
* than CLIENTS_PEAK_MEM_USAGE_SLOTS seconds: however this is not a problem * than CLIENTS_PEAK_MEM_USAGE_SLOTS seconds: however this is not a problem
* since here we want just to track if "recently" there were very expansive * since here we want just to track if "recently" there were very expansive
* clients from the POV of memory usage. */ * clients from the POV of memory usage. */
int zeroidx = (curr_peak_mem_usage_slot+1) % CLIENTS_PEAK_MEM_USAGE_SLOTS; int zeroidx = (CurrentPeakMemUsageSlot+1) % CLIENTS_PEAK_MEM_USAGE_SLOTS;
ClientsPeakMemInput[zeroidx] = 0; ClientsPeakMemInput[zeroidx] = 0;
ClientsPeakMemOutput[zeroidx] = 0; ClientsPeakMemOutput[zeroidx] = 0;
/* Pause the IO threads that are processing clients, to let us access clients
* safely. In order to avoid increasing CPU usage by pausing all threads when
* there are too many io threads, we pause io threads in multiple batches. */
static int start = 1, end = 0;
if (server.io_threads_num >= 1 && listLength(server.clients) > 0) {
end = start + CLIENTS_CRON_PAUSE_IOTHREAD - 1;
if (end >= server.io_threads_num) end = server.io_threads_num - 1;
pauseIOThreadsRange(start, end);
}
while(listLength(server.clients) && iterations--) { while(listLength(server.clients) && iterations--) {
client *c; client *c;
listNode *head; listNode *head;
@ -1158,42 +1175,10 @@ void clientsCron(void) {
c = listNodeValue(head); c = listNodeValue(head);
listRotateHeadToTail(server.clients); listRotateHeadToTail(server.clients);
if (c->running_tid != IOTHREAD_MAIN_THREAD_ID && /* Clients handled by IO threads will be processed by IOThreadClientsCron. */
!(c->running_tid >= start && c->running_tid <= end)) if (c->tid != IOTHREAD_MAIN_THREAD_ID) continue;
{
/* Skip clients that are being processed by the IO threads that
* are not paused. */
continue;
}
/* The following functions do different service checks on the client. clientsCronRunClient(c);
* The protocol is that they return non-zero if the client was
* terminated. */
if (clientsCronHandleTimeout(c,now)) continue;
if (clientsCronResizeQueryBuffer(c)) continue;
if (clientsCronFreeArgvIfIdle(c)) continue;
if (clientsCronResizeOutputBuffer(c,now)) continue;
if (clientsCronTrackExpansiveClients(c, curr_peak_mem_usage_slot)) continue;
/* Iterating all the clients in getMemoryOverheadData() is too slow and
* in turn would make the INFO command too slow. So we perform this
* computation incrementally and track the (not instantaneous but updated
* to the second) total memory used by clients using clientsCron() in
* a more incremental way (depending on server.hz).
* If client eviction is enabled, update the bucket as well. */
if (!updateClientMemUsageAndBucket(c))
updateClientMemoryUsage(c);
if (closeClientOnOutputBufferLimitReached(c, 0)) continue;
}
/* Resume the IO threads that were paused */
if (end) {
resumeIOThreadsRange(start, end);
start = end + 1;
if (start >= server.io_threads_num) start = 1;
end = 0;
} }
} }

View File

@ -149,6 +149,7 @@ struct hdr_histogram;
#define CONFIG_MIN_RESERVED_FDS 32 #define CONFIG_MIN_RESERVED_FDS 32
#define CONFIG_DEFAULT_PROC_TITLE_TEMPLATE "{title} {listen-addr} {server-mode}" #define CONFIG_DEFAULT_PROC_TITLE_TEMPLATE "{title} {listen-addr} {server-mode}"
#define INCREMENTAL_REHASHING_THRESHOLD_US 1000 #define INCREMENTAL_REHASHING_THRESHOLD_US 1000
#define CLIENTS_CRON_MIN_ITERATIONS 5
/* Bucket sizes for client eviction pools. Each bucket stores clients with /* Bucket sizes for client eviction pools. Each bucket stores clients with
* memory usage of up to twice the size of the bucket below it. */ * memory usage of up to twice the size of the bucket below it. */
@ -441,6 +442,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
#define CLIENT_IO_PENDING_COMMAND (1ULL<<2) /* Similar to CLIENT_PENDING_COMMAND. */ #define CLIENT_IO_PENDING_COMMAND (1ULL<<2) /* Similar to CLIENT_PENDING_COMMAND. */
#define CLIENT_IO_REUSABLE_QUERYBUFFER (1ULL<<3) /* The client is using the reusable query buffer. */ #define CLIENT_IO_REUSABLE_QUERYBUFFER (1ULL<<3) /* The client is using the reusable query buffer. */
#define CLIENT_IO_CLOSE_ASAP (1ULL<<4) /* Close this client ASAP in IO thread. */ #define CLIENT_IO_CLOSE_ASAP (1ULL<<4) /* Close this client ASAP in IO thread. */
#define CLIENT_IO_PENDING_CRON (1ULL<<5) /* The client is pending cron job, to be processed in main thread. */
/* Definitions for client read errors. These error codes are used to indicate /* Definitions for client read errors. These error codes are used to indicate
* various issues that can occur while reading or parsing data from a client. */ * various issues that can occur while reading or parsing data from a client. */
@ -1368,6 +1370,7 @@ typedef struct client {
dictEntry *cur_script; /* Cached pointer to the dictEntry of the script being executed. */ dictEntry *cur_script; /* Cached pointer to the dictEntry of the script being executed. */
time_t lastinteraction; /* Time of the last interaction, used for timeout */ time_t lastinteraction; /* Time of the last interaction, used for timeout */
time_t obuf_soft_limit_reached_time; time_t obuf_soft_limit_reached_time;
mstime_t last_cron_check_time; /* The last client check time in cron */
int authenticated; /* Needed when the default user requires auth. */ int authenticated; /* Needed when the default user requires auth. */
int replstate; /* Replication state if this is a slave. */ int replstate; /* Replication state if this is a slave. */
int repl_start_cmd_stream_on_ack; /* Install slave write handler on first ACK. */ int repl_start_cmd_stream_on_ack; /* Install slave write handler on first ACK. */
@ -3395,6 +3398,7 @@ robj *activeDefragStringOb(robj* ob);
void dismissSds(sds s); void dismissSds(sds s);
void dismissMemory(void* ptr, size_t size_hint); void dismissMemory(void* ptr, size_t size_hint);
void dismissMemoryInChild(void); void dismissMemoryInChild(void);
int clientsCronRunClient(client *c);
#define RESTART_SERVER_NONE 0 #define RESTART_SERVER_NONE 0
#define RESTART_SERVER_GRACEFULLY (1<<0) /* Do proper shutdown. */ #define RESTART_SERVER_GRACEFULLY (1<<0) /* Do proper shutdown. */

View File

@ -335,6 +335,15 @@ start_server {tags {"info" "external:skip"}} {
test {stats: instantaneous metrics} { test {stats: instantaneous metrics} {
r config resetstat r config resetstat
set multiplier 1
if {[r config get io-threads] > 1} {
# the IO threads also have clients cron job now, and default hz is 10,
# so the IO thread that have the current client will trigger the main
# thread to run clients cron job, that will also count as a cron tick
set multiplier 2
}
set retries 0 set retries 0
for {set retries 1} {$retries < 4} {incr retries} { for {set retries 1} {$retries < 4} {incr retries} {
after 1600 ;# hz is 10, wait for 16 cron tick so that sample array is fulfilled after 1600 ;# hz is 10, wait for 16 cron tick so that sample array is fulfilled
@ -345,7 +354,7 @@ start_server {tags {"info" "external:skip"}} {
assert_lessthan $retries 4 assert_lessthan $retries 4
if {$::verbose} { puts "instantaneous metrics instantaneous_eventloop_cycles_per_sec: $value" } if {$::verbose} { puts "instantaneous metrics instantaneous_eventloop_cycles_per_sec: $value" }
assert_morethan $value 0 assert_morethan $value 0
assert_lessthan $value [expr $retries*15] ;# default hz is 10 assert_lessthan $value [expr $retries*15*$multiplier] ;# default hz is 10
set value [s instantaneous_eventloop_duration_usec] set value [s instantaneous_eventloop_duration_usec]
if {$::verbose} { puts "instantaneous metrics instantaneous_eventloop_duration_usec: $value" } if {$::verbose} { puts "instantaneous metrics instantaneous_eventloop_duration_usec: $value" }
assert_morethan $value 0 assert_morethan $value 0

View File

@ -212,7 +212,7 @@ start_server {config "minimal.conf" tags {"external:skip"} overrides {enable-deb
# Send set commands for all clients except the first # Send set commands for all clients except the first
for {set i 1} {$i < 16} {incr i} { for {set i 1} {$i < 16} {incr i} {
[set rd$i] set a $i [set rd$i] set $i $i
[set rd$i] flush [set rd$i] flush
} }
@ -221,7 +221,13 @@ start_server {config "minimal.conf" tags {"external:skip"} overrides {enable-deb
# Read the results # Read the results
assert_equal {1} [$rd0 read] assert_equal {1} [$rd0 read]
catch {$rd4 read} err catch {$rd4 read} res
if {$res eq "OK"} {
# maybe OK then err, we can not control the order of execution
catch {$rd4 read} err
} else {
set err $res
}
assert_match {I/O error reading reply} $err assert_match {I/O error reading reply} $err
# verify the prefetch stats are as expected # verify the prefetch stats are as expected
@ -231,10 +237,14 @@ start_server {config "minimal.conf" tags {"external:skip"} overrides {enable-deb
set prefetch_batches [getInfoProperty $info io_threaded_total_prefetch_batches] set prefetch_batches [getInfoProperty $info io_threaded_total_prefetch_batches]
assert_range $prefetch_batches 1 7; # With slower machines, the number of batches can be higher assert_range $prefetch_batches 1 7; # With slower machines, the number of batches can be higher
# Verify the final state # verify other clients are working as expected
$rd15 get a for {set i 1} {$i < 16} {incr i} {
assert_equal {OK} [$rd15 read] if {$i != 4} { ;# 4th client was killed
assert_equal {15} [$rd15 read] [set rd$i] get $i
assert_equal {OK} [[set rd$i] read]
assert_equal $i [[set rd$i] read]
}
}
} }
test {prefetch works as expected when changing the batch size while executing the commands batch} { test {prefetch works as expected when changing the batch size while executing the commands batch} {
@ -324,3 +334,44 @@ start_server {config "minimal.conf" tags {"external:skip"} overrides {enable-deb
} }
} }
} }
start_server {tags {"timeout external:skip"}} {
test {Multiple clients idle timeout test} {
# set client timeout to 1 second
r config set timeout 1
# create multiple client connections
set clients {}
set num_clients 10
for {set i 0} {$i < $num_clients} {incr i} {
set client [redis_deferring_client]
$client ping
assert_equal "PONG" [$client read]
lappend clients $client
}
assert_equal [llength $clients] $num_clients
# wait for 2.5 seconds
after 2500
# try to send commands to all clients - they should all fail due to timeout
set disconnected_count 0
foreach client $clients {
$client ping
if {[catch {$client read} err]} {
incr disconnected_count
# expected error patterns for connection timeout
assert_match {*I/O error*} $err
}
catch {$client close}
}
# all clients should have been disconnected due to timeout
assert_equal $disconnected_count $num_clients
# redis server still works well
reconnect
assert_equal "PONG" [r ping]
}
}

View File

@ -151,15 +151,13 @@ start_server {tags {"obuf-limits external:skip logreqres:skip"}} {
r config set client-output-buffer-limit {normal 100000 0 0} r config set client-output-buffer-limit {normal 100000 0 0}
set value [string repeat "x" 10000] set value [string repeat "x" 10000]
r set bigkey $value r set bigkey $value
set rd1 [redis_deferring_client] set rd [redis_deferring_client]
set rd2 [redis_deferring_client] $rd client setname multicommands
$rd2 client setname multicommands assert_equal "OK" [$rd read]
assert_equal "OK" [$rd2 read]
# Let redis sleep 1s firstly set server_pid [s process_id]
$rd1 debug sleep 1 # Pause the server, so that the client's write will be buffered
$rd1 flush pause_process $server_pid
after 100
# Create a pipeline of commands that will be processed in one socket read. # Create a pipeline of commands that will be processed in one socket read.
# It is important to use one write, in TLS mode independent writes seem # It is important to use one write, in TLS mode independent writes seem
@ -174,15 +172,23 @@ start_server {tags {"obuf-limits external:skip logreqres:skip"}} {
# One bigkey is 10k, total response size must be more than 100k # One bigkey is 10k, total response size must be more than 100k
append buf "get bigkey\r\n" append buf "get bigkey\r\n"
} }
$rd2 write $buf $rd write $buf
$rd2 flush $rd flush
after 100
# Reds must wake up if it can send reply # Resume the server to process the pipeline in one go
resume_process $server_pid
# Make sure the pipeline of commands is processed
wait_for_condition 100 10 {
[expr {[regexp {calls=(\d+)} [cmdrstat get r] -> calls] ? $calls : 0}] >= 5
} else {
fail "the pipeline of commands commands is not processed"
}
# Redis must wake up if it can send reply
assert_equal "PONG" [r ping] assert_equal "PONG" [r ping]
set clients [r client list] set clients [r client list]
assert_no_match "*name=multicommands*" $clients assert_no_match "*name=multicommands*" $clients
assert_equal {} [$rd2 rawread] assert_equal {} [$rd rawread]
} }
test {Execute transactions completely even if client output buffer limit is enforced} { test {Execute transactions completely even if client output buffer limit is enforced} {