mirror of https://github.com/redis/redis.git
Fix HINCRBYFLOAT removes field expiration on replica (#14227)
CI / test-ubuntu-latest (push) Has been cancelled
Details
CI / test-sanitizer-address (push) Has been cancelled
Details
CI / build-debian-old (push) Has been cancelled
Details
CI / build-macos-latest (push) Has been cancelled
Details
CI / build-32bit (push) Has been cancelled
Details
CI / build-libc-malloc (push) Has been cancelled
Details
CI / build-centos-jemalloc (push) Has been cancelled
Details
CI / build-old-chain-jemalloc (push) Has been cancelled
Details
External Server Tests / test-external-standalone (push) Has been cancelled
Details
External Server Tests / test-external-cluster (push) Has been cancelled
Details
External Server Tests / test-external-nodebug (push) Has been cancelled
Details
Spellcheck / Spellcheck (push) Has been cancelled
Details
CI / test-ubuntu-latest (push) Has been cancelled
Details
CI / test-sanitizer-address (push) Has been cancelled
Details
CI / build-debian-old (push) Has been cancelled
Details
CI / build-macos-latest (push) Has been cancelled
Details
CI / build-32bit (push) Has been cancelled
Details
CI / build-libc-malloc (push) Has been cancelled
Details
CI / build-centos-jemalloc (push) Has been cancelled
Details
CI / build-old-chain-jemalloc (push) Has been cancelled
Details
External Server Tests / test-external-standalone (push) Has been cancelled
Details
External Server Tests / test-external-cluster (push) Has been cancelled
Details
External Server Tests / test-external-nodebug (push) Has been cancelled
Details
Spellcheck / Spellcheck (push) Has been cancelled
Details
Fixes https://github.com/redis/redis/issues/14218 Before, we replicate HINCRBYFLOAT as an HSET command with the final value in order to make sure that differences in float precision or formatting will not create differences in replicas or after an AOF restart. However, on the replica side, if the field has an expiration time, HSET will remove it, even though the master retains it. This leads to inconsistencies between the master and the replica. For Redis 8.0 and above, [PR #14224](https://github.com/redis/redis/pull/14224) has fixed this issue. However, Redis 7.4 does not support the HSETEX command. To address this, HINCRBYFLOAT will be replicated as a simple HSET if no expiration is set. If an expiration is specified, it will be replicated as a MULTI/EXEC containing HSET and HPEXPIREAT commands.
This commit is contained in:
parent
7e0f533932
commit
1d96a95d75
|
@ -1987,6 +1987,7 @@ void createSharedObjects(void) {
|
|||
shared.special_asterick = createStringObject("*",1);
|
||||
shared.special_equals = createStringObject("=",1);
|
||||
shared.redacted = makeObjectShared(createStringObject("(redacted)",10));
|
||||
shared.fields = createStringObject("FIELDS",6);
|
||||
|
||||
for (j = 0; j < OBJ_SHARED_INTEGERS; j++) {
|
||||
shared.integers[j] =
|
||||
|
|
|
@ -1325,7 +1325,7 @@ struct sharedObjectsStruct {
|
|||
*script, *replconf, *eval, *persist, *set, *pexpireat, *pexpire,
|
||||
*hdel, *hpexpireat,
|
||||
*time, *pxat, *absttl, *retrycount, *force, *justid, *entriesread,
|
||||
*lastid, *ping, *setid, *keepttl, *load, *createconsumer,
|
||||
*lastid, *ping, *setid, *keepttl, *load, *createconsumer, *fields,
|
||||
*getack, *special_asterick, *special_equals, *default_username, *redacted,
|
||||
*ssubscribebulk,*sunsubscribebulk, *smessagebulk,
|
||||
*select[PROTO_SHARED_SELECT_CMDS],
|
||||
|
|
50
src/t_hash.c
50
src/t_hash.c
|
@ -706,24 +706,29 @@ GetFieldRes hashTypeGetFromHashTable(robj *o, sds field, sds *value, uint64_t *e
|
|||
* If *vll is populated *vstr is set to NULL, so the caller can
|
||||
* always check the function return by checking the return value
|
||||
* for GETF_OK and checking if vll (or vstr) is NULL.
|
||||
* expiredAt - if the field has an expiration time, it will be set to the expiration
|
||||
* time of the field. Otherwise, will be set to EB_EXPIRE_TIME_INVALID.
|
||||
*
|
||||
*/
|
||||
GetFieldRes hashTypeGetValue(redisDb *db, robj *o, sds field, unsigned char **vstr,
|
||||
unsigned int *vlen, long long *vll, int hfeFlags) {
|
||||
uint64_t expiredAt;
|
||||
unsigned int *vlen, long long *vll, int hfeFlags, uint64_t *expiredAt)
|
||||
{
|
||||
sds key;
|
||||
GetFieldRes res;
|
||||
uint64_t dummy;
|
||||
if (expiredAt == NULL) expiredAt = &dummy;
|
||||
|
||||
if (o->encoding == OBJ_ENCODING_LISTPACK ||
|
||||
o->encoding == OBJ_ENCODING_LISTPACK_EX) {
|
||||
*vstr = NULL;
|
||||
res = hashTypeGetFromListpack(o, field, vstr, vlen, vll, &expiredAt);
|
||||
res = hashTypeGetFromListpack(o, field, vstr, vlen, vll, expiredAt);
|
||||
|
||||
if (res == GETF_NOT_FOUND)
|
||||
return GETF_NOT_FOUND;
|
||||
|
||||
} else if (o->encoding == OBJ_ENCODING_HT) {
|
||||
sds value = NULL;
|
||||
res = hashTypeGetFromHashTable(o, field, &value, &expiredAt);
|
||||
res = hashTypeGetFromHashTable(o, field, &value, expiredAt);
|
||||
|
||||
if (res == GETF_NOT_FOUND)
|
||||
return GETF_NOT_FOUND;
|
||||
|
@ -734,7 +739,7 @@ GetFieldRes hashTypeGetValue(redisDb *db, robj *o, sds field, unsigned char **vs
|
|||
serverPanic("Unknown hash encoding");
|
||||
}
|
||||
|
||||
if (expiredAt >= (uint64_t) commandTimeSnapshot())
|
||||
if (*expiredAt >= (uint64_t) commandTimeSnapshot())
|
||||
return GETF_OK;
|
||||
|
||||
if (server.masterhost) {
|
||||
|
@ -794,7 +799,7 @@ robj *hashTypeGetValueObject(redisDb *db, robj *o, sds field, int hfeFlags, int
|
|||
long long vll;
|
||||
|
||||
if (isHashDeleted) *isHashDeleted = 0;
|
||||
GetFieldRes res = hashTypeGetValue(db,o,field,&vstr,&vlen,&vll, hfeFlags);
|
||||
GetFieldRes res = hashTypeGetValue(db,o,field,&vstr,&vlen,&vll, hfeFlags, NULL);
|
||||
|
||||
if (res == GETF_OK) {
|
||||
if (vstr) return createStringObject((char*)vstr,vlen);
|
||||
|
@ -823,7 +828,7 @@ int hashTypeExists(redisDb *db, robj *o, sds field, int hfeFlags, int *isHashDel
|
|||
unsigned int vlen = UINT_MAX;
|
||||
long long vll = LLONG_MAX;
|
||||
|
||||
GetFieldRes res = hashTypeGetValue(db, o, field, &vstr, &vlen, &vll, hfeFlags);
|
||||
GetFieldRes res = hashTypeGetValue(db, o, field, &vstr, &vlen, &vll, hfeFlags, NULL);
|
||||
if (isHashDeleted)
|
||||
*isHashDeleted = (res == GETF_EXPIRED_HASH) ? 1 : 0;
|
||||
return (res == GETF_OK) ? 1 : 0;
|
||||
|
@ -2195,7 +2200,7 @@ void hincrbyCommand(client *c) {
|
|||
if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return;
|
||||
|
||||
GetFieldRes res = hashTypeGetValue(c->db,o,c->argv[2]->ptr,&vstr,&vlen,&value,
|
||||
HFE_LAZY_EXPIRE);
|
||||
HFE_LAZY_EXPIRE, NULL);
|
||||
if (res == GETF_OK) {
|
||||
if (vstr) {
|
||||
if (string2ll((char*)vstr,vlen,&value) == 0) {
|
||||
|
@ -2234,6 +2239,8 @@ void hincrbyfloatCommand(client *c) {
|
|||
sds new;
|
||||
unsigned char *vstr;
|
||||
unsigned int vlen;
|
||||
int has_expiration = 0;
|
||||
uint64_t expireat = EB_EXPIRE_TIME_INVALID;
|
||||
|
||||
if (getLongDoubleFromObjectOrReply(c,c->argv[3],&incr,NULL) != C_OK) return;
|
||||
if (isnan(incr) || isinf(incr)) {
|
||||
|
@ -2242,7 +2249,7 @@ void hincrbyfloatCommand(client *c) {
|
|||
}
|
||||
if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return;
|
||||
GetFieldRes res = hashTypeGetValue(c->db, o,c->argv[2]->ptr,&vstr,&vlen,&ll,
|
||||
HFE_LAZY_EXPIRE);
|
||||
HFE_LAZY_EXPIRE, &expireat);
|
||||
if (res == GETF_OK) {
|
||||
if (vstr) {
|
||||
if (string2ld((char*)vstr,vlen,&value) == 0) {
|
||||
|
@ -2252,6 +2259,8 @@ void hincrbyfloatCommand(client *c) {
|
|||
} else {
|
||||
value = (long double)ll;
|
||||
}
|
||||
/* Field has expiration time. */
|
||||
if (expireat != EB_EXPIRE_TIME_INVALID) has_expiration = 1;
|
||||
} else if ((res == GETF_NOT_FOUND) || (res == GETF_EXPIRED)) {
|
||||
value = 0;
|
||||
} else {
|
||||
|
@ -2284,6 +2293,25 @@ void hincrbyfloatCommand(client *c) {
|
|||
rewriteClientCommandArgument(c,0,shared.hset);
|
||||
rewriteClientCommandArgument(c,3,newobj);
|
||||
decrRefCount(newobj);
|
||||
|
||||
if (has_expiration) {
|
||||
/* To make sure that the HSET command is propagated before the HPEXPIREAT,
|
||||
* we need to prevent the HSET command from being propagated, and then
|
||||
* propagate both commands manually in the correct order. */
|
||||
preventCommandPropagation(c);
|
||||
/* Propagate HSET */
|
||||
alsoPropagate(c->db->id, c->argv, c->argc, PROPAGATE_AOF|PROPAGATE_REPL);
|
||||
/* Propagate HPEXPIREAT */
|
||||
robj *argv[6];
|
||||
argv[0] = shared.hpexpireat;
|
||||
argv[1] = c->argv[1];
|
||||
argv[2] = createStringObjectFromLongLong(expireat);
|
||||
argv[3] = shared.fields;
|
||||
argv[4] = shared.integers[1];
|
||||
argv[5] = c->argv[2];
|
||||
alsoPropagate(c->db->id, argv, 6, PROPAGATE_AOF|PROPAGATE_REPL);
|
||||
decrRefCount(argv[2]);
|
||||
}
|
||||
}
|
||||
|
||||
static GetFieldRes addHashFieldToReply(client *c, robj *o, sds field, int hfeFlags) {
|
||||
|
@ -2296,7 +2324,7 @@ static GetFieldRes addHashFieldToReply(client *c, robj *o, sds field, int hfeFla
|
|||
unsigned int vlen = UINT_MAX;
|
||||
long long vll = LLONG_MAX;
|
||||
|
||||
GetFieldRes res = hashTypeGetValue(c->db, o, field, &vstr, &vlen, &vll, hfeFlags);
|
||||
GetFieldRes res = hashTypeGetValue(c->db, o, field, &vstr, &vlen, &vll, hfeFlags, NULL);
|
||||
if (res == GETF_OK) {
|
||||
if (vstr) {
|
||||
addReplyBulkCBuffer(c, vstr, vlen);
|
||||
|
@ -2408,7 +2436,7 @@ void hstrlenCommand(client *c) {
|
|||
checkType(c,o,OBJ_HASH)) return;
|
||||
|
||||
GetFieldRes res = hashTypeGetValue(c->db, o, c->argv[2]->ptr, &vstr, &vlen, &vll,
|
||||
HFE_LAZY_EXPIRE);
|
||||
HFE_LAZY_EXPIRE, NULL);
|
||||
|
||||
if (res == GETF_NOT_FOUND || res == GETF_EXPIRED || res == GETF_EXPIRED_HASH) {
|
||||
addReply(c, shared.czero);
|
||||
|
|
|
@ -1266,5 +1266,57 @@ start_server {tags {"external:skip needs:debug"}} {
|
|||
assert_equal [dumpAllHashes $primary] [dumpAllHashes $replica]
|
||||
}
|
||||
}
|
||||
|
||||
test "HINCRBYFLOAT command won't remove field expiration on replica ($type)" {
|
||||
r flushall
|
||||
set repl [attach_to_replication_stream]
|
||||
|
||||
r hset h1 f1 1
|
||||
r hset h1 f2 1
|
||||
r hexpire h1 100 FIELDS 1 f1
|
||||
r hincrbyfloat h1 f1 1.1
|
||||
r hincrbyfloat h1 f2 1.1
|
||||
|
||||
# HINCRBYFLOAT will be replicated as HSET if no expiration time is set.
|
||||
# Otherwise it will be replicated as HSET+HPEXPIREAT multi command.
|
||||
assert_replication_stream $repl {
|
||||
{select *}
|
||||
{hset h1 f1 1}
|
||||
{hset h1 f2 1}
|
||||
{hpexpireat h1 * FIELDS 1 f1}
|
||||
{multi}
|
||||
{hset h1 f1 *}
|
||||
{hpexpireat h1 * FIELDS 1 f1}
|
||||
{exec}
|
||||
{hset h1 f2 *}
|
||||
}
|
||||
close_replication_stream $repl
|
||||
|
||||
start_server {tags {external:skip}} {
|
||||
r -1 flushall
|
||||
r slaveof [srv -1 host] [srv -1 port]
|
||||
wait_for_sync r
|
||||
|
||||
r -1 hset h1 f1 1
|
||||
r -1 hset h1 f2 1
|
||||
r -1 hexpire h1 100 FIELDS 1 f1
|
||||
wait_for_ofs_sync [srv -1 client] [srv 0 client]
|
||||
assert_range [r httl h1 FIELDS 1 f1] 90 100
|
||||
assert_equal {-1} [r httl h1 FIELDS 1 f2]
|
||||
|
||||
r -1 hincrbyfloat h1 f1 1.1
|
||||
r -1 hincrbyfloat h1 f2 1.1
|
||||
|
||||
# Expiration time should not be removed on replica and the value
|
||||
# should be equal to the master.
|
||||
wait_for_ofs_sync [srv -1 client] [srv 0 client]
|
||||
assert_range [r httl h1 FIELDS 1 f1] 90 100
|
||||
assert_equal [r -1 hget h1 f1] [r hget h1 f1]
|
||||
|
||||
# The field f2 should not have any expiration time on replica either.
|
||||
assert_equal {-1} [r httl h1 FIELDS 1 f2]
|
||||
assert_equal [r -1 hget h1 f2] [r hget h1 f2]
|
||||
}
|
||||
} {} {needs:repl external:skip}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue