From ca6145b18c35f405557197342f30fe24505b0e18 Mon Sep 17 00:00:00 2001 From: wclmxxs <49094604+wclmxxs@users.noreply.github.com> Date: Mon, 30 Jun 2025 09:37:17 +0800 Subject: [PATCH] Reduce the main thread blocking in clients cron (#13900) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The main thread needs to check clients in every cron iteration. During this check, the corresponding I/O threads must not operate on these clients to avoid data-race. As a result, the main thread is blocked until the I/O threads finish processing and are suspended, allowing the main thread to proceed with client checks. Since the main thread's resources are more valuable than those of I/O threads in Redis, this blocking behavior should be avoided. To address this, the I/O threads check during their cron whether any of their maintained clients need to be inspected by the main thread. If so, the I/O threads send those clients to the main thread for processing, then the main thread runs cron jobs for these clients. In addition, an always-active client might not be in thread->clients, so before processing the client’s command, we also check whether the client has skipped running its cron job for over 1 second. If it has, we run the cron job for the client. The main thread does not need to actively pause the IO threads, thus avoiding potential blocking behavior, fixes https://github.com/redis/redis/issues/13885 Besides, this approach also can let all clients run cron task in a second, but before, we pause IO threads in multiple batches when there are more than 8 IO threads, that may cause some clients are not be processed in a second. --------- Co-authored-by: Yuan Wang --- src/iothread.c | 57 ++++++++++++++++++++++-- src/networking.c | 1 + src/server.c | 91 ++++++++++++++++---------------------- src/server.h | 4 ++ tests/unit/info.tcl | 11 ++++- tests/unit/networking.tcl | 63 +++++++++++++++++++++++--- tests/unit/obuf-limits.tcl | 32 ++++++++------ 7 files changed, 183 insertions(+), 76 deletions(-) diff --git a/src/iothread.c b/src/iothread.c index 89e218546..adf475735 100644 --- a/src/iothread.c +++ b/src/iothread.c @@ -452,8 +452,17 @@ int processClientsFromIOThread(IOThread *t) { continue; } - /* Update the client in the mem usage */ - updateClientMemUsageAndBucket(c); + /* Run cron task for the client per second or it is marked as pending cron. */ + if (c->last_cron_check_time + 1000 <= server.mstime || + c->io_flags & CLIENT_IO_PENDING_CRON) + { + c->last_cron_check_time = server.mstime; + if (clientsCronRunClient(c)) continue; + } else { + /* Update the client in the mem usage if clientsCronRunClient is not + * being called, since that function already performs the update. */ + updateClientMemUsageAndBucket(c); + } /* Process the pending command and input buffer. */ if (!c->read_error && c->io_flags & CLIENT_IO_PENDING_COMMAND) { @@ -594,7 +603,7 @@ int processClientsFromMainThread(IOThread *t) { /* Enable read and write and reset some flags. */ c->io_flags |= CLIENT_IO_READ_ENABLED | CLIENT_IO_WRITE_ENABLED; - c->io_flags &= ~CLIENT_IO_PENDING_COMMAND; + c->io_flags &= ~(CLIENT_IO_PENDING_COMMAND | CLIENT_IO_PENDING_CRON); /* Only bind once, we never remove read handler unless freeing client. */ if (!connHasEventLoop(c->conn)) { @@ -658,6 +667,41 @@ void IOThreadAfterSleep(struct aeEventLoop *el) { atomicSetWithSync(t->running, 1); } +/* Periodically transfer part of clients to the main thread for processing. */ +void IOThreadClientsCron(IOThread *t) { + /* Process at least a few clients while we are at it, even if we need + * to process less than CLIENTS_CRON_MIN_ITERATIONS to meet our contract + * of processing each client once per second. */ + int iterations = listLength(t->clients) / CONFIG_DEFAULT_HZ; + if (iterations < CLIENTS_CRON_MIN_ITERATIONS) { + iterations = CLIENTS_CRON_MIN_ITERATIONS; + } + + listIter li; + listNode *ln; + listRewind(t->clients, &li); + while ((ln = listNext(&li)) && iterations--) { + client *c = listNodeValue(ln); + /* Mark the client as pending cron, main thread will process it. */ + c->io_flags |= CLIENT_IO_PENDING_CRON; + enqueuePendingClientsToMainThread(c, 0); + } +} + +/* This is the IO thread timer interrupt, CONFIG_DEFAULT_HZ times per second. + * The current responsibility is to detect clients that have been stuck in the + * IO thread for too long and hand them over to the main thread for handling. */ +int IOThreadCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { + UNUSED(eventLoop); + UNUSED(id); + IOThread *t = clientData; + + /* Run cron tasks for the clients in the IO thread. */ + IOThreadClientsCron(t); + + return 1000/CONFIG_DEFAULT_HZ; +} + /* The main function of IO thread, it will run an event loop. The mian thread * and IO thread will communicate through event notifier. */ void *IOThreadMain(void *ptr) { @@ -716,6 +760,13 @@ void initThreadedIO(void) { exit(1); } + /* This is the timer callback of the IO thread, used to gradually handle + * some background operations, such as clients cron. */ + if (aeCreateTimeEvent(t->el, 1, IOThreadCron, t, NULL) == AE_ERR) { + serverLog(LL_WARNING, "Fatal: Can't create event loop timers in IO thread."); + exit(1); + } + /* Create IO thread */ if (pthread_create(&t->tid, NULL, IOThreadMain, (void*)t) != 0) { serverLog(LL_WARNING, "Fatal: Can't initialize IO thread."); diff --git a/src/networking.c b/src/networking.c index 92a1086f8..8b72a4d80 100644 --- a/src/networking.c +++ b/src/networking.c @@ -212,6 +212,7 @@ client *createClient(connection *conn) { c->postponed_list_node = NULL; c->client_tracking_redirection = 0; c->client_tracking_prefixes = NULL; + c->last_cron_check_time = 0; c->last_memory_usage = 0; c->last_memory_type = CLIENT_TYPE_NORMAL; c->module_blocked_client = NULL; diff --git a/src/server.c b/src/server.c index b7d883ec1..4051cc5f1 100644 --- a/src/server.c +++ b/src/server.c @@ -952,16 +952,19 @@ int clientsCronResizeOutputBuffer(client *c, mstime_t now_ms) { #define CLIENTS_PEAK_MEM_USAGE_SLOTS 8 size_t ClientsPeakMemInput[CLIENTS_PEAK_MEM_USAGE_SLOTS] = {0}; size_t ClientsPeakMemOutput[CLIENTS_PEAK_MEM_USAGE_SLOTS] = {0}; +int CurrentPeakMemUsageSlot = 0; -int clientsCronTrackExpansiveClients(client *c, int time_idx) { +int clientsCronTrackExpansiveClients(client *c) { size_t qb_size = c->querybuf ? sdsZmallocSize(c->querybuf) : 0; size_t argv_size = c->argv ? zmalloc_size(c->argv) : 0; size_t in_usage = qb_size + c->argv_len_sum + argv_size; size_t out_usage = getClientOutputBufferMemoryUsage(c); /* Track the biggest values observed so far in this slot. */ - if (in_usage > ClientsPeakMemInput[time_idx]) ClientsPeakMemInput[time_idx] = in_usage; - if (out_usage > ClientsPeakMemOutput[time_idx]) ClientsPeakMemOutput[time_idx] = out_usage; + if (in_usage > ClientsPeakMemInput[CurrentPeakMemUsageSlot]) + ClientsPeakMemInput[CurrentPeakMemUsageSlot] = in_usage; + if (out_usage > ClientsPeakMemOutput[CurrentPeakMemUsageSlot]) + ClientsPeakMemOutput[CurrentPeakMemUsageSlot] = out_usage; return 0; /* This function never terminates the client. */ } @@ -1088,6 +1091,33 @@ void getExpansiveClientsInfo(size_t *in_usage, size_t *out_usage) { *out_usage = o; } +/* Run cron tasks for a single client. Return 1 if the client should + * be terminated, 0 otherwise. */ +int clientsCronRunClient(client *c) { + mstime_t now = server.mstime; + /* The following functions do different service checks on the client. + * The protocol is that they return non-zero if the client was + * terminated. */ + if (clientsCronHandleTimeout(c,now)) return 1; + if (clientsCronResizeQueryBuffer(c)) return 1; + if (clientsCronFreeArgvIfIdle(c)) return 1; + if (clientsCronResizeOutputBuffer(c,now)) return 1; + + if (clientsCronTrackExpansiveClients(c)) return 1; + + /* Iterating all the clients in getMemoryOverheadData() is too slow and + * in turn would make the INFO command too slow. So we perform this + * computation incrementally and track the (not instantaneous but updated + * to the second) total memory used by clients using clientsCron() in + * a more incremental way (depending on server.hz). + * If client eviction is enabled, update the bucket as well. */ + if (!updateClientMemUsageAndBucket(c)) + updateClientMemoryUsage(c); + + if (closeClientOnOutputBufferLimitReached(c, 0)) return 1; + return 0; +} + /* This function is called by serverCron() and is used in order to perform * operations on clients that are important to perform constantly. For instance * we use this function in order to disconnect clients after a timeout, including @@ -1103,8 +1133,6 @@ void getExpansiveClientsInfo(size_t *in_usage, size_t *out_usage) { * default server.hz value is 10, so sometimes here we need to process thousands * of clients per second, turning this function into a source of latency. */ -#define CLIENTS_CRON_PAUSE_IOTHREAD 8 -#define CLIENTS_CRON_MIN_ITERATIONS 5 void clientsCron(void) { /* Try to process at least numclients/server.hz of clients * per call. Since normally (if there are no big latency events) this @@ -1112,7 +1140,6 @@ void clientsCron(void) { * process all the clients in 1 second. */ int numclients = listLength(server.clients); int iterations = numclients/server.hz; - mstime_t now = mstime(); /* Process at least a few clients while we are at it, even if we need * to process less than CLIENTS_CRON_MIN_ITERATIONS to meet our contract @@ -1122,7 +1149,7 @@ void clientsCron(void) { numclients : CLIENTS_CRON_MIN_ITERATIONS; - int curr_peak_mem_usage_slot = server.unixtime % CLIENTS_PEAK_MEM_USAGE_SLOTS; + CurrentPeakMemUsageSlot = server.unixtime % CLIENTS_PEAK_MEM_USAGE_SLOTS; /* Always zero the next sample, so that when we switch to that second, we'll * only register samples that are greater in that second without considering * the history of such slot. @@ -1134,20 +1161,10 @@ void clientsCron(void) { * than CLIENTS_PEAK_MEM_USAGE_SLOTS seconds: however this is not a problem * since here we want just to track if "recently" there were very expansive * clients from the POV of memory usage. */ - int zeroidx = (curr_peak_mem_usage_slot+1) % CLIENTS_PEAK_MEM_USAGE_SLOTS; + int zeroidx = (CurrentPeakMemUsageSlot+1) % CLIENTS_PEAK_MEM_USAGE_SLOTS; ClientsPeakMemInput[zeroidx] = 0; ClientsPeakMemOutput[zeroidx] = 0; - /* Pause the IO threads that are processing clients, to let us access clients - * safely. In order to avoid increasing CPU usage by pausing all threads when - * there are too many io threads, we pause io threads in multiple batches. */ - static int start = 1, end = 0; - if (server.io_threads_num >= 1 && listLength(server.clients) > 0) { - end = start + CLIENTS_CRON_PAUSE_IOTHREAD - 1; - if (end >= server.io_threads_num) end = server.io_threads_num - 1; - pauseIOThreadsRange(start, end); - } - while(listLength(server.clients) && iterations--) { client *c; listNode *head; @@ -1158,42 +1175,10 @@ void clientsCron(void) { c = listNodeValue(head); listRotateHeadToTail(server.clients); - if (c->running_tid != IOTHREAD_MAIN_THREAD_ID && - !(c->running_tid >= start && c->running_tid <= end)) - { - /* Skip clients that are being processed by the IO threads that - * are not paused. */ - continue; - } + /* Clients handled by IO threads will be processed by IOThreadClientsCron. */ + if (c->tid != IOTHREAD_MAIN_THREAD_ID) continue; - /* The following functions do different service checks on the client. - * The protocol is that they return non-zero if the client was - * terminated. */ - if (clientsCronHandleTimeout(c,now)) continue; - if (clientsCronResizeQueryBuffer(c)) continue; - if (clientsCronFreeArgvIfIdle(c)) continue; - if (clientsCronResizeOutputBuffer(c,now)) continue; - - if (clientsCronTrackExpansiveClients(c, curr_peak_mem_usage_slot)) continue; - - /* Iterating all the clients in getMemoryOverheadData() is too slow and - * in turn would make the INFO command too slow. So we perform this - * computation incrementally and track the (not instantaneous but updated - * to the second) total memory used by clients using clientsCron() in - * a more incremental way (depending on server.hz). - * If client eviction is enabled, update the bucket as well. */ - if (!updateClientMemUsageAndBucket(c)) - updateClientMemoryUsage(c); - - if (closeClientOnOutputBufferLimitReached(c, 0)) continue; - } - - /* Resume the IO threads that were paused */ - if (end) { - resumeIOThreadsRange(start, end); - start = end + 1; - if (start >= server.io_threads_num) start = 1; - end = 0; + clientsCronRunClient(c); } } diff --git a/src/server.h b/src/server.h index 9173c5abf..19e2166cf 100644 --- a/src/server.h +++ b/src/server.h @@ -149,6 +149,7 @@ struct hdr_histogram; #define CONFIG_MIN_RESERVED_FDS 32 #define CONFIG_DEFAULT_PROC_TITLE_TEMPLATE "{title} {listen-addr} {server-mode}" #define INCREMENTAL_REHASHING_THRESHOLD_US 1000 +#define CLIENTS_CRON_MIN_ITERATIONS 5 /* Bucket sizes for client eviction pools. Each bucket stores clients with * memory usage of up to twice the size of the bucket below it. */ @@ -441,6 +442,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; #define CLIENT_IO_PENDING_COMMAND (1ULL<<2) /* Similar to CLIENT_PENDING_COMMAND. */ #define CLIENT_IO_REUSABLE_QUERYBUFFER (1ULL<<3) /* The client is using the reusable query buffer. */ #define CLIENT_IO_CLOSE_ASAP (1ULL<<4) /* Close this client ASAP in IO thread. */ +#define CLIENT_IO_PENDING_CRON (1ULL<<5) /* The client is pending cron job, to be processed in main thread. */ /* Definitions for client read errors. These error codes are used to indicate * various issues that can occur while reading or parsing data from a client. */ @@ -1368,6 +1370,7 @@ typedef struct client { dictEntry *cur_script; /* Cached pointer to the dictEntry of the script being executed. */ time_t lastinteraction; /* Time of the last interaction, used for timeout */ time_t obuf_soft_limit_reached_time; + mstime_t last_cron_check_time; /* The last client check time in cron */ int authenticated; /* Needed when the default user requires auth. */ int replstate; /* Replication state if this is a slave. */ int repl_start_cmd_stream_on_ack; /* Install slave write handler on first ACK. */ @@ -3395,6 +3398,7 @@ robj *activeDefragStringOb(robj* ob); void dismissSds(sds s); void dismissMemory(void* ptr, size_t size_hint); void dismissMemoryInChild(void); +int clientsCronRunClient(client *c); #define RESTART_SERVER_NONE 0 #define RESTART_SERVER_GRACEFULLY (1<<0) /* Do proper shutdown. */ diff --git a/tests/unit/info.tcl b/tests/unit/info.tcl index 9400e29d8..4d766f095 100644 --- a/tests/unit/info.tcl +++ b/tests/unit/info.tcl @@ -335,6 +335,15 @@ start_server {tags {"info" "external:skip"}} { test {stats: instantaneous metrics} { r config resetstat + + set multiplier 1 + if {[r config get io-threads] > 1} { + # the IO threads also have clients cron job now, and default hz is 10, + # so the IO thread that have the current client will trigger the main + # thread to run clients cron job, that will also count as a cron tick + set multiplier 2 + } + set retries 0 for {set retries 1} {$retries < 4} {incr retries} { after 1600 ;# hz is 10, wait for 16 cron tick so that sample array is fulfilled @@ -345,7 +354,7 @@ start_server {tags {"info" "external:skip"}} { assert_lessthan $retries 4 if {$::verbose} { puts "instantaneous metrics instantaneous_eventloop_cycles_per_sec: $value" } assert_morethan $value 0 - assert_lessthan $value [expr $retries*15] ;# default hz is 10 + assert_lessthan $value [expr $retries*15*$multiplier] ;# default hz is 10 set value [s instantaneous_eventloop_duration_usec] if {$::verbose} { puts "instantaneous metrics instantaneous_eventloop_duration_usec: $value" } assert_morethan $value 0 diff --git a/tests/unit/networking.tcl b/tests/unit/networking.tcl index 1b8e50291..4f63f4e01 100644 --- a/tests/unit/networking.tcl +++ b/tests/unit/networking.tcl @@ -212,7 +212,7 @@ start_server {config "minimal.conf" tags {"external:skip"} overrides {enable-deb # Send set commands for all clients except the first for {set i 1} {$i < 16} {incr i} { - [set rd$i] set a $i + [set rd$i] set $i $i [set rd$i] flush } @@ -221,7 +221,13 @@ start_server {config "minimal.conf" tags {"external:skip"} overrides {enable-deb # Read the results assert_equal {1} [$rd0 read] - catch {$rd4 read} err + catch {$rd4 read} res + if {$res eq "OK"} { + # maybe OK then err, we can not control the order of execution + catch {$rd4 read} err + } else { + set err $res + } assert_match {I/O error reading reply} $err # verify the prefetch stats are as expected @@ -231,10 +237,14 @@ start_server {config "minimal.conf" tags {"external:skip"} overrides {enable-deb set prefetch_batches [getInfoProperty $info io_threaded_total_prefetch_batches] assert_range $prefetch_batches 1 7; # With slower machines, the number of batches can be higher - # Verify the final state - $rd15 get a - assert_equal {OK} [$rd15 read] - assert_equal {15} [$rd15 read] + # verify other clients are working as expected + for {set i 1} {$i < 16} {incr i} { + if {$i != 4} { ;# 4th client was killed + [set rd$i] get $i + assert_equal {OK} [[set rd$i] read] + assert_equal $i [[set rd$i] read] + } + } } test {prefetch works as expected when changing the batch size while executing the commands batch} { @@ -324,3 +334,44 @@ start_server {config "minimal.conf" tags {"external:skip"} overrides {enable-deb } } } + +start_server {tags {"timeout external:skip"}} { + test {Multiple clients idle timeout test} { + # set client timeout to 1 second + r config set timeout 1 + + # create multiple client connections + set clients {} + set num_clients 10 + + for {set i 0} {$i < $num_clients} {incr i} { + set client [redis_deferring_client] + $client ping + assert_equal "PONG" [$client read] + lappend clients $client + } + assert_equal [llength $clients] $num_clients + + # wait for 2.5 seconds + after 2500 + + # try to send commands to all clients - they should all fail due to timeout + set disconnected_count 0 + foreach client $clients { + $client ping + if {[catch {$client read} err]} { + incr disconnected_count + # expected error patterns for connection timeout + assert_match {*I/O error*} $err + } + catch {$client close} + } + + # all clients should have been disconnected due to timeout + assert_equal $disconnected_count $num_clients + + # redis server still works well + reconnect + assert_equal "PONG" [r ping] + } +} diff --git a/tests/unit/obuf-limits.tcl b/tests/unit/obuf-limits.tcl index 527623337..7fb1e7f14 100644 --- a/tests/unit/obuf-limits.tcl +++ b/tests/unit/obuf-limits.tcl @@ -151,15 +151,13 @@ start_server {tags {"obuf-limits external:skip logreqres:skip"}} { r config set client-output-buffer-limit {normal 100000 0 0} set value [string repeat "x" 10000] r set bigkey $value - set rd1 [redis_deferring_client] - set rd2 [redis_deferring_client] - $rd2 client setname multicommands - assert_equal "OK" [$rd2 read] + set rd [redis_deferring_client] + $rd client setname multicommands + assert_equal "OK" [$rd read] - # Let redis sleep 1s firstly - $rd1 debug sleep 1 - $rd1 flush - after 100 + set server_pid [s process_id] + # Pause the server, so that the client's write will be buffered + pause_process $server_pid # Create a pipeline of commands that will be processed in one socket read. # It is important to use one write, in TLS mode independent writes seem @@ -174,15 +172,23 @@ start_server {tags {"obuf-limits external:skip logreqres:skip"}} { # One bigkey is 10k, total response size must be more than 100k append buf "get bigkey\r\n" } - $rd2 write $buf - $rd2 flush - after 100 + $rd write $buf + $rd flush - # Reds must wake up if it can send reply + # Resume the server to process the pipeline in one go + resume_process $server_pid + # Make sure the pipeline of commands is processed + wait_for_condition 100 10 { + [expr {[regexp {calls=(\d+)} [cmdrstat get r] -> calls] ? $calls : 0}] >= 5 + } else { + fail "the pipeline of commands commands is not processed" + } + + # Redis must wake up if it can send reply assert_equal "PONG" [r ping] set clients [r client list] assert_no_match "*name=multicommands*" $clients - assert_equal {} [$rd2 rawread] + assert_equal {} [$rd rawread] } test {Execute transactions completely even if client output buffer limit is enforced} {