mirror of https://github.com/redis/redis.git
CLIENT UNBLOCK should't be able to unpause paused clients (#14164)
CI / test-ubuntu-latest (push) Has been cancelled
Details
CI / test-sanitizer-address (push) Has been cancelled
Details
CI / build-debian-old (push) Has been cancelled
Details
CI / build-macos-latest (push) Has been cancelled
Details
CI / build-32bit (push) Has been cancelled
Details
CI / build-libc-malloc (push) Has been cancelled
Details
CI / build-centos-jemalloc (push) Has been cancelled
Details
CI / build-old-chain-jemalloc (push) Has been cancelled
Details
Codecov / code-coverage (push) Has been cancelled
Details
External Server Tests / test-external-standalone (push) Has been cancelled
Details
External Server Tests / test-external-cluster (push) Has been cancelled
Details
External Server Tests / test-external-nodebug (push) Has been cancelled
Details
Spellcheck / Spellcheck (push) Has been cancelled
Details
CI / test-ubuntu-latest (push) Has been cancelled
Details
CI / test-sanitizer-address (push) Has been cancelled
Details
CI / build-debian-old (push) Has been cancelled
Details
CI / build-macos-latest (push) Has been cancelled
Details
CI / build-32bit (push) Has been cancelled
Details
CI / build-libc-malloc (push) Has been cancelled
Details
CI / build-centos-jemalloc (push) Has been cancelled
Details
CI / build-old-chain-jemalloc (push) Has been cancelled
Details
Codecov / code-coverage (push) Has been cancelled
Details
External Server Tests / test-external-standalone (push) Has been cancelled
Details
External Server Tests / test-external-cluster (push) Has been cancelled
Details
External Server Tests / test-external-nodebug (push) Has been cancelled
Details
Spellcheck / Spellcheck (push) Has been cancelled
Details
This PR is based on https://github.com/valkey-io/valkey/pull/2117 When a client is blocked by something like `CLIENT PAUSE`, we should not allow `CLIENT UNBLOCK timeout` to unblock it, since some blocking types does not has the timeout callback, it will trigger a panic in the core, people should use `CLIENT UNPAUSE` to unblock it. Also using `CLIENT UNBLOCK error` is not right, it will return a UNBLOCKED error to the command, people don't expect a `SET` command to get an error. So in this commit, in these cases, we will return 0 to `CLIENT UNBLOCK` to indicate the unblock is fail. The reason is that we assume that if a command doesn't expect to be timedout, it also doesn't expect to be unblocked by `CLIENT UNBLOCK`. The old behavior of the following command will trigger panic in timeout and get UNBLOCKED error in error. Under the new behavior, client unblock will get the result of 0. ``` client 1> client pause 100000 write client 2> set x x client 1> client unblock 2 timeout or client 1> client unblock 2 error ``` --------- Signed-off-by: Binbin <binloveplay1314@qq.com> Co-authored-by: Binbin <binloveplay1314@qq.com>
This commit is contained in:
parent
7df34143c2
commit
ecd5e639ed
|
@ -214,6 +214,24 @@ void unblockClient(client *c, int queue_for_reprocessing) {
|
||||||
if (queue_for_reprocessing) queueClientForReprocessing(c);
|
if (queue_for_reprocessing) queueClientForReprocessing(c);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Check if the specified client can be safely timed out using
|
||||||
|
* unblockClientOnTimeout(). */
|
||||||
|
int blockedClientMayTimeout(client *c) {
|
||||||
|
if (c->bstate.btype == BLOCKED_MODULE) {
|
||||||
|
return moduleBlockedClientMayTimeout(c);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (c->bstate.btype == BLOCKED_LIST ||
|
||||||
|
c->bstate.btype == BLOCKED_ZSET ||
|
||||||
|
c->bstate.btype == BLOCKED_STREAM ||
|
||||||
|
c->bstate.btype == BLOCKED_WAIT ||
|
||||||
|
c->bstate.btype == BLOCKED_WAITAOF)
|
||||||
|
{
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
/* This function gets called when a blocked client timed out in order to
|
/* This function gets called when a blocked client timed out in order to
|
||||||
* send it a reply of some kind. After this function is called,
|
* send it a reply of some kind. After this function is called,
|
||||||
* unblockClient() will be called with the same client as argument. */
|
* unblockClient() will be called with the same client as argument. */
|
||||||
|
|
|
@ -3712,11 +3712,12 @@ NULL
|
||||||
if (getLongLongFromObjectOrReply(c,c->argv[2],&id,NULL)
|
if (getLongLongFromObjectOrReply(c,c->argv[2],&id,NULL)
|
||||||
!= C_OK) return;
|
!= C_OK) return;
|
||||||
struct client *target = lookupClientByID(id);
|
struct client *target = lookupClientByID(id);
|
||||||
/* Note that we never try to unblock a client blocked on a module command, which
|
/* Note that we never try to unblock a client blocked on a module command,
|
||||||
|
* or a client blocked by CLIENT PAUSE or some other blocking type which
|
||||||
* doesn't have a timeout callback (even in the case of UNBLOCK ERROR).
|
* doesn't have a timeout callback (even in the case of UNBLOCK ERROR).
|
||||||
* The reason is that we assume that if a command doesn't expect to be timedout,
|
* The reason is that we assume that if a command doesn't expect to be timedout,
|
||||||
* it also doesn't expect to be unblocked by CLIENT UNBLOCK */
|
* it also doesn't expect to be unblocked by CLIENT UNBLOCK */
|
||||||
if (target && target->flags & CLIENT_BLOCKED && moduleBlockedClientMayTimeout(target)) {
|
if (target && target->flags & CLIENT_BLOCKED && blockedClientMayTimeout(target)) {
|
||||||
if (unblock_error)
|
if (unblock_error)
|
||||||
unblockClientOnError(target,
|
unblockClientOnError(target,
|
||||||
"-UNBLOCKED client unblocked via CLIENT UNBLOCK");
|
"-UNBLOCKED client unblocked via CLIENT UNBLOCK");
|
||||||
|
|
|
@ -3816,6 +3816,7 @@ void unblockClient(client *c, int queue_for_reprocessing);
|
||||||
void unblockClientOnTimeout(client *c);
|
void unblockClientOnTimeout(client *c);
|
||||||
void unblockClientOnError(client *c, const char *err_str);
|
void unblockClientOnError(client *c, const char *err_str);
|
||||||
void queueClientForReprocessing(client *c);
|
void queueClientForReprocessing(client *c);
|
||||||
|
int blockedClientMayTimeout(client *c);
|
||||||
void replyToBlockedClientTimedOut(client *c);
|
void replyToBlockedClientTimedOut(client *c);
|
||||||
int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit);
|
int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit);
|
||||||
void disconnectAllBlockedClients(void);
|
void disconnectAllBlockedClients(void);
|
||||||
|
|
|
@ -1,3 +1,17 @@
|
||||||
|
#
|
||||||
|
# Copyright (c) 2009-Present, Redis Ltd.
|
||||||
|
# All rights reserved.
|
||||||
|
#
|
||||||
|
# Copyright (c) 2024-present, Valkey contributors.
|
||||||
|
# All rights reserved.
|
||||||
|
#
|
||||||
|
# Licensed under your choice of (a) the Redis Source Available License 2.0
|
||||||
|
# (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
|
||||||
|
# GNU Affero General Public License v3 (AGPLv3).
|
||||||
|
#
|
||||||
|
# Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
|
||||||
|
#
|
||||||
|
|
||||||
start_server {tags {"pause network"}} {
|
start_server {tags {"pause network"}} {
|
||||||
test "Test read commands are not blocked by client pause" {
|
test "Test read commands are not blocked by client pause" {
|
||||||
r client PAUSE 100000 WRITE
|
r client PAUSE 100000 WRITE
|
||||||
|
@ -379,6 +393,36 @@ start_server {tags {"pause network"}} {
|
||||||
assert_equal [r randomkey] {}
|
assert_equal [r randomkey] {}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test "CLIENT UNBLOCK is not allow to unblock client blocked by CLIENT PAUSE" {
|
||||||
|
set rd1 [redis_deferring_client]
|
||||||
|
set rd2 [redis_deferring_client]
|
||||||
|
$rd1 client id
|
||||||
|
$rd2 client id
|
||||||
|
set client_id1 [$rd1 read]
|
||||||
|
set client_id2 [$rd2 read]
|
||||||
|
|
||||||
|
r del mylist
|
||||||
|
r client pause 100000 write
|
||||||
|
$rd1 blpop mylist 0
|
||||||
|
$rd2 blpop mylist 0
|
||||||
|
wait_for_blocked_clients_count 2 50 100
|
||||||
|
|
||||||
|
# This used to trigger a panic.
|
||||||
|
assert_equal 0 [r client unblock $client_id1 timeout]
|
||||||
|
# This used to return a UNBLOCKED error.
|
||||||
|
assert_equal 0 [r client unblock $client_id2 error]
|
||||||
|
|
||||||
|
# After the unpause, it must be able to unblock the client.
|
||||||
|
r client unpause
|
||||||
|
assert_equal 1 [r client unblock $client_id1 timeout]
|
||||||
|
assert_equal 1 [r client unblock $client_id2 error]
|
||||||
|
assert_equal {} [$rd1 read]
|
||||||
|
assert_error "UNBLOCKED*" {$rd2 read}
|
||||||
|
|
||||||
|
$rd1 close
|
||||||
|
$rd2 close
|
||||||
|
}
|
||||||
|
|
||||||
# Make sure we unpause at the end
|
# Make sure we unpause at the end
|
||||||
r client unpause
|
r client unpause
|
||||||
}
|
}
|
||||||
|
|
|
@ -140,6 +140,29 @@ tags {"wait aof network external:skip"} {
|
||||||
assert_error {ERR WAITAOF cannot be used when numlocal is set but appendonly is disabled.} {$master waitaof 1 0 0}
|
assert_error {ERR WAITAOF cannot be used when numlocal is set but appendonly is disabled.} {$master waitaof 1 0 0}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test {WAITAOF local client unblock with timeout and error} {
|
||||||
|
r config set appendonly yes
|
||||||
|
r config set appendfsync no
|
||||||
|
set rd [redis_deferring_client]
|
||||||
|
$rd client id
|
||||||
|
set client_id [$rd read]
|
||||||
|
|
||||||
|
# Test unblock with timeout
|
||||||
|
$rd incr foo
|
||||||
|
$rd read
|
||||||
|
$rd waitaof 1 0 0
|
||||||
|
wait_for_blocked_client
|
||||||
|
assert_equal 1 [r client unblock $client_id timeout]
|
||||||
|
|
||||||
|
# Test unblock with error
|
||||||
|
$rd incr foo
|
||||||
|
$rd read
|
||||||
|
$rd waitaof 1 0 0
|
||||||
|
wait_for_blocked_client
|
||||||
|
assert_equal 1 [r client unblock $client_id error]
|
||||||
|
$rd close
|
||||||
|
}
|
||||||
|
|
||||||
test {WAITAOF local if AOFRW was postponed} {
|
test {WAITAOF local if AOFRW was postponed} {
|
||||||
r config set appendfsync everysec
|
r config set appendfsync everysec
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue