diff --git a/src/iothread.c b/src/iothread.c index cf28f6db2f..a170317911 100644 --- a/src/iothread.c +++ b/src/iothread.c @@ -51,17 +51,18 @@ void enqueuePendingClientsToMainThread(client *c, int unbind) { if (unbind) connUnbindEventLoop(c->conn); /* Just skip if it already is transferred. */ if (c->io_thread_client_list_node) { + IOThread *t = &IOThreads[c->tid]; /* If there are several clients to process, let the main thread handle them ASAP. * Since the client being added to the queue may still need to be processed by * the IO thread, we must call this before adding it to the queue to avoid * races with the main thread. */ - sendPendingClientsToMainThreadIfNeeded(&IOThreads[c->tid], 1); - /* Remove the client from clients list of IO thread. */ - listDelNode(IOThreads[c->tid].clients, c->io_thread_client_list_node); - c->io_thread_client_list_node = NULL; + sendPendingClientsToMainThreadIfNeeded(t, 1); /* Disable read and write to avoid race when main thread processes. */ c->io_flags &= ~(CLIENT_IO_READ_ENABLED | CLIENT_IO_WRITE_ENABLED); - listAddNodeTail(IOThreads[c->tid].pending_clients_to_main_thread, c); + /* Remove the client from IO thread, add it to main thread's pending list. */ + listUnlinkNode(t->clients, c->io_thread_client_list_node); + listLinkNodeTail(t->pending_clients_to_main_thread, c->io_thread_client_list_node); + c->io_thread_client_list_node = NULL; } } @@ -544,7 +545,8 @@ int processClientsFromMainThread(IOThread *t) { /* Link client in IO thread clients list first. */ serverAssert(c->io_thread_client_list_node == NULL); - listAddNodeTail(t->clients, c); + listUnlinkNode(t->processing_clients, ln); + listLinkNodeTail(t->clients, ln); c->io_thread_client_list_node = listLast(t->clients); /* The client now is in the IO thread, let's free deferred objects. */ @@ -575,7 +577,8 @@ int processClientsFromMainThread(IOThread *t) { } } } - listEmpty(t->processing_clients); + /* All clients must are processed. */ + serverAssert(listLength(t->processing_clients) == 0); return processed; }