mirror of https://github.com/redis/redis.git
Make IO thread and main thread process in parallel and reduce notifications (#13969)
CI / test-ubuntu-latest (push) Waiting to run
Details
CI / test-sanitizer-address (push) Waiting to run
Details
CI / build-debian-old (push) Waiting to run
Details
CI / build-macos-latest (push) Waiting to run
Details
CI / build-32bit (push) Waiting to run
Details
CI / build-libc-malloc (push) Waiting to run
Details
CI / build-centos-jemalloc (push) Waiting to run
Details
CI / build-old-chain-jemalloc (push) Waiting to run
Details
Codecov / code-coverage (push) Waiting to run
Details
External Server Tests / test-external-standalone (push) Waiting to run
Details
External Server Tests / test-external-cluster (push) Waiting to run
Details
External Server Tests / test-external-nodebug (push) Waiting to run
Details
Spellcheck / Spellcheck (push) Waiting to run
Details
CI / test-ubuntu-latest (push) Waiting to run
Details
CI / test-sanitizer-address (push) Waiting to run
Details
CI / build-debian-old (push) Waiting to run
Details
CI / build-macos-latest (push) Waiting to run
Details
CI / build-32bit (push) Waiting to run
Details
CI / build-libc-malloc (push) Waiting to run
Details
CI / build-centos-jemalloc (push) Waiting to run
Details
CI / build-old-chain-jemalloc (push) Waiting to run
Details
Codecov / code-coverage (push) Waiting to run
Details
External Server Tests / test-external-standalone (push) Waiting to run
Details
External Server Tests / test-external-cluster (push) Waiting to run
Details
External Server Tests / test-external-nodebug (push) Waiting to run
Details
Spellcheck / Spellcheck (push) Waiting to run
Details
In pipeline mode, especially with TLS, two IO threads may have worse performance than single thread, one reason is the io thread and the main thread cannot process in parallel, now, the IO threads will deliver clients if pending client list is more than 16 instead of finishing processing all clients, this approach can make IO threads and main thread process in parallel as much as possible. IO threads may do some unnecessary notification with the main thread, the notification is based on eventfd, read(2) and write(2) eventfd are system calls that are costly. When they are running, they can check the pending client list to process in `beforeSleep`, so in this commit, if both the main thread and the IO thread are running, they can pass the client without notification, and these transferred clients will be processed in `beforeSleep`.
This commit is contained in:
parent
65e164caff
commit
d5f7672b77
174
src/iothread.c
174
src/iothread.c
|
@ -20,6 +20,28 @@ static list *mainThreadPendingClients[IO_THREADS_MAX_NUM]; /* Pending clients fr
|
|||
static pthread_mutex_t mainThreadPendingClientsMutexes[IO_THREADS_MAX_NUM]; /* Mutex for pending clients */
|
||||
static eventNotifier* mainThreadPendingClientsNotifiers[IO_THREADS_MAX_NUM]; /* Notifier for pending clients */
|
||||
|
||||
/* Send the clients to the main thread for processing when the number of clients
|
||||
* in pending list reaches IO_THREAD_MAX_PENDING_CLIENTS, or check_size is 0. */
|
||||
static inline void sendPendingClientsToMainThreadIfNeeded(IOThread *t, int check_size) {
|
||||
size_t len = listLength(t->pending_clients_to_main_thread);
|
||||
if (len == 0 || (check_size && len < IO_THREAD_MAX_PENDING_CLIENTS)) return;
|
||||
|
||||
int running = 0, pending = 0;
|
||||
pthread_mutex_lock(&mainThreadPendingClientsMutexes[t->id]);
|
||||
pending = listLength(mainThreadPendingClients[t->id]);
|
||||
listJoin(mainThreadPendingClients[t->id], t->pending_clients_to_main_thread);
|
||||
pthread_mutex_unlock(&mainThreadPendingClientsMutexes[t->id]);
|
||||
if (!pending) atomicGetWithSync(server.running, running);
|
||||
|
||||
/* Only notify main thread if it is not running and no pending clients to process,
|
||||
* to avoid unnecessary notify/wakeup. If the main thread is running, it will
|
||||
* process the clients in beforeSleep. If there are pending clients, we may
|
||||
* already notify the main thread if needed. */
|
||||
if (!running && !pending) {
|
||||
triggerEventNotifier(mainThreadPendingClientsNotifiers[t->id]);
|
||||
}
|
||||
}
|
||||
|
||||
/* When IO threads read a complete query of clients or want to free clients, it
|
||||
* should remove it from its clients list and put the client in the list to main
|
||||
* thread, we will send these clients to main thread in IOThreadBeforeSleep. */
|
||||
|
@ -29,6 +51,12 @@ void enqueuePendingClientsToMainThread(client *c, int unbind) {
|
|||
if (unbind) connUnbindEventLoop(c->conn);
|
||||
/* Just skip if it already is transferred. */
|
||||
if (c->io_thread_client_list_node) {
|
||||
/* If there are several clients to process, let the main thread handle them ASAP.
|
||||
* Since the client being added to the queue may still need to be processed by
|
||||
* the IO thread, we must call this before adding it to the queue to avoid
|
||||
* races with the main thread. */
|
||||
sendPendingClientsToMainThreadIfNeeded(&IOThreads[c->tid], 1);
|
||||
/* Remove the client from clients list of IO thread. */
|
||||
listDelNode(IOThreads[c->tid].clients, c->io_thread_client_list_node);
|
||||
c->io_thread_client_list_node = NULL;
|
||||
/* Disable read and write to avoid race when main thread processes. */
|
||||
|
@ -311,6 +339,35 @@ int sendPendingClientsToIOThreads(void) {
|
|||
|
||||
extern int ProcessingEventsWhileBlocked;
|
||||
|
||||
/* Send the pending clients to the IO thread if the number of pending clients
|
||||
* is greater than IO_THREAD_MAX_PENDING_CLIENTS, or if size_check is 0. */
|
||||
static inline void sendPendingClientsToIOThreadIfNeeded(IOThread *t, int size_check) {
|
||||
size_t len = listLength(mainThreadPendingClientsToIOThreads[t->id]);
|
||||
if (len == 0 || (size_check && len < IO_THREAD_MAX_PENDING_CLIENTS)) return;
|
||||
|
||||
/* If AOF fsync policy is always, we should not let io thread handle these
|
||||
* clients now since we don't flush AOF buffer to file and sync yet.
|
||||
* So these clients will be delayed to send io threads in beforeSleep after
|
||||
* flushAppendOnlyFile.
|
||||
*
|
||||
* If we are in processEventsWhileBlocked, we don't send clients to io threads
|
||||
* now, we want to update server.events_processed_while_blocked accurately. */
|
||||
if (server.aof_fsync != AOF_FSYNC_ALWAYS && !ProcessingEventsWhileBlocked) {
|
||||
int running = 0, pending = 0;
|
||||
pthread_mutex_lock(&(t->pending_clients_mutex));
|
||||
pending = listLength(t->pending_clients);
|
||||
listJoin(t->pending_clients, mainThreadPendingClientsToIOThreads[t->id]);
|
||||
pthread_mutex_unlock(&(t->pending_clients_mutex));
|
||||
if (!pending) atomicGetWithSync(t->running, running);
|
||||
|
||||
/* Only notify io thread if it is not running and no pending clients to
|
||||
* process, to avoid unnecessary notify/wakeup. If the io thread is running,
|
||||
* it will process the clients in beforeSleep. If there are pending clients,
|
||||
* we may already notify the io thread if needed. */
|
||||
if(!running && !pending) triggerEventNotifier(t->pending_clients_notifier);
|
||||
}
|
||||
}
|
||||
|
||||
/* The main thread processes the clients from IO threads, these clients may have
|
||||
* a complete command to execute or need to be freed. Note that IO threads never
|
||||
* free client since this operation access much server data.
|
||||
|
@ -320,9 +377,15 @@ extern int ProcessingEventsWhileBlocked;
|
|||
* when processing script command, it may call processEventsWhileBlocked to
|
||||
* process new events, if the clients with fired events from the same io thread,
|
||||
* it may call this function reentrantly. */
|
||||
void processClientsFromIOThread(IOThread *t) {
|
||||
listNode *node = NULL;
|
||||
int processClientsFromIOThread(IOThread *t) {
|
||||
/* Get the list of clients to process. */
|
||||
pthread_mutex_lock(&mainThreadPendingClientsMutexes[t->id]);
|
||||
listJoin(mainThreadProcessingClients[t->id], mainThreadPendingClients[t->id]);
|
||||
pthread_mutex_unlock(&mainThreadPendingClientsMutexes[t->id]);
|
||||
size_t processed = listLength(mainThreadProcessingClients[t->id]);
|
||||
if (processed == 0) return 0;
|
||||
|
||||
listNode *node = NULL;
|
||||
while (listLength(mainThreadProcessingClients[t->id])) {
|
||||
/* Each time we pop up only the first client to process to guarantee
|
||||
* reentrancy safety. */
|
||||
|
@ -384,28 +447,18 @@ void processClientsFromIOThread(IOThread *t) {
|
|||
c->running_tid = c->tid;
|
||||
listLinkNodeHead(mainThreadPendingClientsToIOThreads[c->tid], node);
|
||||
node = NULL;
|
||||
|
||||
/* If there are several clients to process, let io thread handle them ASAP. */
|
||||
sendPendingClientsToIOThreadIfNeeded(t, 1);
|
||||
}
|
||||
if (node) zfree(node);
|
||||
|
||||
/* Trigger the io thread to handle these clients ASAP to make them processed
|
||||
* in parallel.
|
||||
*
|
||||
* If AOF fsync policy is always, we should not let io thread handle these
|
||||
* clients now since we don't flush AOF buffer to file and sync yet.
|
||||
* So these clients will be delayed to send io threads in beforeSleep after
|
||||
* flushAppendOnlyFile.
|
||||
*
|
||||
* If we are in processEventsWhileBlocked, we don't send clients to io threads
|
||||
* now, we want to update server.events_processed_while_blocked accurately. */
|
||||
if (listLength(mainThreadPendingClientsToIOThreads[t->id]) &&
|
||||
server.aof_fsync != AOF_FSYNC_ALWAYS &&
|
||||
!ProcessingEventsWhileBlocked)
|
||||
{
|
||||
pthread_mutex_lock(&(t->pending_clients_mutex));
|
||||
listJoin(t->pending_clients, mainThreadPendingClientsToIOThreads[t->id]);
|
||||
pthread_mutex_unlock(&(t->pending_clients_mutex));
|
||||
triggerEventNotifier(t->pending_clients_notifier);
|
||||
}
|
||||
/* Send the clients to io thread without pending size check, since main thread
|
||||
* may process clients from other io threads, so we need to send them to the
|
||||
* io thread to process in prallel. */
|
||||
sendPendingClientsToIOThreadIfNeeded(t, 0);
|
||||
|
||||
return processed;
|
||||
}
|
||||
|
||||
/* When the io thread finishes processing the client with the read event, it will
|
||||
|
@ -421,12 +474,6 @@ void handleClientsFromIOThread(struct aeEventLoop *el, int fd, void *ptr, int ma
|
|||
serverAssert(fd == getReadEventFd(mainThreadPendingClientsNotifiers[t->id]));
|
||||
handleEventNotifier(mainThreadPendingClientsNotifiers[t->id]);
|
||||
|
||||
/* Get the list of clients to process. */
|
||||
pthread_mutex_lock(&mainThreadPendingClientsMutexes[t->id]);
|
||||
listJoin(mainThreadProcessingClients[t->id], mainThreadPendingClients[t->id]);
|
||||
pthread_mutex_unlock(&mainThreadPendingClientsMutexes[t->id]);
|
||||
if (listLength(mainThreadProcessingClients[t->id]) == 0) return;
|
||||
|
||||
/* Process the clients from IO threads. */
|
||||
processClientsFromIOThread(t);
|
||||
}
|
||||
|
@ -439,19 +486,21 @@ void handleClientsFromIOThread(struct aeEventLoop *el, int fd, void *ptr, int ma
|
|||
* processed, so we need to handle this scenario in beforeSleep. The function is to
|
||||
* process the commands of subsequent clients from io threads. And another function
|
||||
* sendPendingClientsToIOThreads make sure clients from io thread can get replies.
|
||||
* See also beforeSleep. */
|
||||
void processClientsOfAllIOThreads(void) {
|
||||
* See also beforeSleep.
|
||||
*
|
||||
* In beforeSleep, we also call this function to handle the clients that are
|
||||
* transferred from io threads without notification. */
|
||||
int processClientsOfAllIOThreads(void) {
|
||||
int processed = 0;
|
||||
for (int i = 1; i < server.io_threads_num; i++) {
|
||||
processClientsFromIOThread(&IOThreads[i]);
|
||||
processed += processClientsFromIOThread(&IOThreads[i]);
|
||||
}
|
||||
return processed;
|
||||
}
|
||||
|
||||
/* After the main thread processes the clients, it will send the clients back to
|
||||
* io threads to handle, and fire an event, the io thread handles the event by
|
||||
* this function. If the client is not binded to the event loop, we should bind
|
||||
* it first and install read handler, and we don't uninstall client read handler
|
||||
* unless freeing client. If the client has pending reply, we just reply to client
|
||||
* first, and then install write handler if needed. */
|
||||
* this function. */
|
||||
void handleClientsFromMainThread(struct aeEventLoop *ae, int fd, void *ptr, int mask) {
|
||||
UNUSED(ae);
|
||||
UNUSED(mask);
|
||||
|
@ -462,10 +511,21 @@ void handleClientsFromMainThread(struct aeEventLoop *ae, int fd, void *ptr, int
|
|||
serverAssert(fd == getReadEventFd(t->pending_clients_notifier));
|
||||
handleEventNotifier(t->pending_clients_notifier);
|
||||
|
||||
/* Process the clients from main thread. */
|
||||
processClientsFromMainThread(t);
|
||||
}
|
||||
|
||||
/* Processing clients that have finished executing commands from the main thread.
|
||||
* If the client is not binded to the event loop, we should bind it first and
|
||||
* install read handler. If the client still has query buffer, we should process
|
||||
* the input buffer. If the client has pending reply, we just reply to client,
|
||||
* and then install write handler if needed. */
|
||||
int processClientsFromMainThread(IOThread *t) {
|
||||
pthread_mutex_lock(&t->pending_clients_mutex);
|
||||
listJoin(t->processing_clients, t->pending_clients);
|
||||
pthread_mutex_unlock(&t->pending_clients_mutex);
|
||||
if (listLength(t->processing_clients) == 0) return;
|
||||
size_t processed = listLength(t->processing_clients);
|
||||
if (processed == 0) return 0;
|
||||
|
||||
listIter li;
|
||||
listNode *ln;
|
||||
|
@ -508,6 +568,7 @@ void handleClientsFromMainThread(struct aeEventLoop *ae, int fd, void *ptr, int
|
|||
}
|
||||
}
|
||||
listEmpty(t->processing_clients);
|
||||
return processed;
|
||||
}
|
||||
|
||||
void IOThreadBeforeSleep(struct aeEventLoop *el) {
|
||||
|
@ -517,22 +578,39 @@ void IOThreadBeforeSleep(struct aeEventLoop *el) {
|
|||
connTypeProcessPendingData(el);
|
||||
|
||||
/* If any connection type(typical TLS) still has pending unread data don't sleep at all. */
|
||||
aeSetDontWait(el, connTypeHasPendingData(el));
|
||||
int dont_sleep = connTypeHasPendingData(el);
|
||||
|
||||
/* Process clients from main thread, since the main thread may deliver clients
|
||||
* without notification during IO thread processing events. */
|
||||
if (processClientsFromMainThread(t) > 0) {
|
||||
/* If there are clients that are processed, we should not sleep since main
|
||||
* thread may want to continue deliverring clients without notification, so
|
||||
* IO thread can process them ASAP, and the main thread can avoid unnecessary
|
||||
* notification (write fd and wake up) is costly. */
|
||||
dont_sleep = 1;
|
||||
}
|
||||
if (!dont_sleep) {
|
||||
atomicSetWithSync(t->running, 0); /* Not running if going to sleep. */
|
||||
/* Try to process clients from main thread again, since before we set
|
||||
* running to 0, the main thread may deliver clients to this io thread. */
|
||||
processClientsFromMainThread(t);
|
||||
}
|
||||
aeSetDontWait(t->el, dont_sleep);
|
||||
|
||||
/* Check if i am being paused, pause myself and resume. */
|
||||
handlePauseAndResume(t);
|
||||
|
||||
/* Check if there are clients to be processed in main thread, and then join
|
||||
* them to the list of main thread. */
|
||||
if (listLength(t->pending_clients_to_main_thread) > 0) {
|
||||
pthread_mutex_lock(&mainThreadPendingClientsMutexes[t->id]);
|
||||
listJoin(mainThreadPendingClients[t->id], t->pending_clients_to_main_thread);
|
||||
pthread_mutex_unlock(&mainThreadPendingClientsMutexes[t->id]);
|
||||
/* Trigger an event, maybe an error is returned when buffer is full
|
||||
* if using pipe, but no worry, main thread will handle all clients
|
||||
* in list when receiving a notification. */
|
||||
triggerEventNotifier(mainThreadPendingClientsNotifiers[t->id]);
|
||||
}
|
||||
/* Send clients to main thread to process, we don't check size here since
|
||||
* we want to send all clients to main thread before going to sleeping. */
|
||||
sendPendingClientsToMainThreadIfNeeded(t, 0);
|
||||
}
|
||||
|
||||
void IOThreadAfterSleep(struct aeEventLoop *el) {
|
||||
IOThread *t = el->privdata[0];
|
||||
|
||||
/* Set the IO thread to running state, so the main thread can deliver
|
||||
* clients to it without extra notifications. */
|
||||
atomicSetWithSync(t->running, 1);
|
||||
}
|
||||
|
||||
/* The main function of IO thread, it will run an event loop. The mian thread
|
||||
|
@ -545,6 +623,7 @@ void *IOThreadMain(void *ptr) {
|
|||
redisSetCpuAffinity(server.server_cpulist);
|
||||
makeThreadKillable();
|
||||
aeSetBeforeSleepProc(t->el, IOThreadBeforeSleep);
|
||||
aeSetAfterSleepProc(t->el, IOThreadAfterSleep);
|
||||
aeMain(t->el);
|
||||
return NULL;
|
||||
}
|
||||
|
@ -572,6 +651,7 @@ void initThreadedIO(void) {
|
|||
t->pending_clients_to_main_thread = listCreate();
|
||||
t->clients = listCreate();
|
||||
atomicSetWithSync(t->paused, IO_THREAD_UNPAUSED);
|
||||
atomicSetWithSync(t->running, 0);
|
||||
|
||||
pthread_mutexattr_t *attr = NULL;
|
||||
#if defined(__linux__) && defined(__GLIBC__)
|
||||
|
|
20
src/server.c
20
src/server.c
|
@ -1871,6 +1871,22 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
|
|||
dont_sleep = 1;
|
||||
}
|
||||
|
||||
if (server.io_threads_num > 1) {
|
||||
/* Corresponding to IOThreadBeforeSleep, process the clients from IO threads
|
||||
* without notification. */
|
||||
if (processClientsOfAllIOThreads() > 0) {
|
||||
/* If there are clients that are processed, it means IO thread is busy to
|
||||
* trafer clients to main thread, so the main thread does not sleep. */
|
||||
dont_sleep = 1;
|
||||
}
|
||||
if (!dont_sleep) {
|
||||
atomicSetWithSync(server.running, 0); /* Not running if going to sleep. */
|
||||
/* Try to process the clients from IO threads again, since before setting running
|
||||
* to 0, some clients may be transferred without notification. */
|
||||
processClientsOfAllIOThreads();
|
||||
}
|
||||
}
|
||||
|
||||
/* Handle writes with pending output buffers. */
|
||||
handleClientsWithPendingWrites();
|
||||
|
||||
|
@ -1952,6 +1968,9 @@ void afterSleep(struct aeEventLoop *eventLoop) {
|
|||
server.el_cmd_cnt_start = server.stat_numcommands;
|
||||
}
|
||||
|
||||
/* Set running after waking up */
|
||||
if (server.io_threads_num > 1) atomicSetWithSync(server.running, 1);
|
||||
|
||||
/* Update the time cache. */
|
||||
updateCachedTime(1);
|
||||
|
||||
|
@ -2873,6 +2892,7 @@ void initServer(void) {
|
|||
server.repl_good_slaves_count = 0;
|
||||
server.last_sig_received = 0;
|
||||
memset(server.io_threads_clients_num, 0, sizeof(server.io_threads_clients_num));
|
||||
atomicSetWithSync(server.running, 0);
|
||||
|
||||
/* Initiate acl info struct */
|
||||
server.acl_info.invalid_cmd_accesses = 0;
|
||||
|
|
|
@ -213,6 +213,10 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
|
|||
/* Max number of IO threads */
|
||||
#define IO_THREADS_MAX_NUM 128
|
||||
|
||||
/* To make IO threads and main thread run in parallel, we will transfer clients
|
||||
* between them if the number of clients in the pending list reaches this value. */
|
||||
#define IO_THREAD_MAX_PENDING_CLIENTS 16
|
||||
|
||||
/* Main thread id for doing IO work, whatever we enable or disable io thread
|
||||
* the main thread always does IO work, so we can consider that the main thread
|
||||
* is the io thread 0. */
|
||||
|
@ -1453,6 +1457,7 @@ typedef struct __attribute__((aligned(CACHE_LINE_SIZE))) {
|
|||
* than 256, we should also promote the data type. */
|
||||
pthread_t tid; /* Pthread ID */
|
||||
redisAtomic int paused; /* Paused status for the io thread. */
|
||||
redisAtomic int running; /* Running if true, main thread can send clients directly. */
|
||||
aeEventLoop *el; /* Main event loop of io thread. */
|
||||
list *pending_clients; /* List of clients with pending writes. */
|
||||
list *processing_clients; /* List of clients being processed. */
|
||||
|
@ -1773,6 +1778,7 @@ struct redisServer {
|
|||
int client_pause_in_transaction; /* Was a client pause executed during this Exec? */
|
||||
int thp_enabled; /* If true, THP is enabled. */
|
||||
size_t page_size; /* The page size of OS. */
|
||||
redisAtomic int running; /* Running if true, IO threads can send clients without notification */
|
||||
/* Modules */
|
||||
dict *moduleapi; /* Exported core APIs dictionary for modules. */
|
||||
dict *sharedapi; /* Like moduleapi but containing the APIs that
|
||||
|
@ -2907,7 +2913,8 @@ void enqueuePendingClientsToMainThread(client *c, int unbind);
|
|||
void putInPendingClienstForIOThreads(client *c);
|
||||
void handleClientReadError(client *c);
|
||||
void unbindClientFromIOThreadEventLoop(client *c);
|
||||
void processClientsOfAllIOThreads(void);
|
||||
int processClientsOfAllIOThreads(void);
|
||||
int processClientsFromMainThread(IOThread *t);
|
||||
void assignClientToIOThread(client *c);
|
||||
void fetchClientFromIOThread(client *c);
|
||||
int isClientMustHandledByMainThread(client *c);
|
||||
|
|
Loading…
Reference in New Issue