diff --git a/src/server.c b/src/server.c index 1a7bf2700..9fd820151 100644 --- a/src/server.c +++ b/src/server.c @@ -1987,6 +1987,7 @@ void createSharedObjects(void) { shared.special_asterick = createStringObject("*",1); shared.special_equals = createStringObject("=",1); shared.redacted = makeObjectShared(createStringObject("(redacted)",10)); + shared.fields = createStringObject("FIELDS",6); for (j = 0; j < OBJ_SHARED_INTEGERS; j++) { shared.integers[j] = diff --git a/src/server.h b/src/server.h index ec68fa213..5bbf4f877 100644 --- a/src/server.h +++ b/src/server.h @@ -1325,7 +1325,7 @@ struct sharedObjectsStruct { *script, *replconf, *eval, *persist, *set, *pexpireat, *pexpire, *hdel, *hpexpireat, *time, *pxat, *absttl, *retrycount, *force, *justid, *entriesread, - *lastid, *ping, *setid, *keepttl, *load, *createconsumer, + *lastid, *ping, *setid, *keepttl, *load, *createconsumer, *fields, *getack, *special_asterick, *special_equals, *default_username, *redacted, *ssubscribebulk,*sunsubscribebulk, *smessagebulk, *select[PROTO_SHARED_SELECT_CMDS], diff --git a/src/t_hash.c b/src/t_hash.c index 3b9a6e332..45e0dc2b5 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -706,24 +706,29 @@ GetFieldRes hashTypeGetFromHashTable(robj *o, sds field, sds *value, uint64_t *e * If *vll is populated *vstr is set to NULL, so the caller can * always check the function return by checking the return value * for GETF_OK and checking if vll (or vstr) is NULL. + * expiredAt - if the field has an expiration time, it will be set to the expiration + * time of the field. Otherwise, will be set to EB_EXPIRE_TIME_INVALID. * */ GetFieldRes hashTypeGetValue(redisDb *db, robj *o, sds field, unsigned char **vstr, - unsigned int *vlen, long long *vll, int hfeFlags) { - uint64_t expiredAt; + unsigned int *vlen, long long *vll, int hfeFlags, uint64_t *expiredAt) +{ sds key; GetFieldRes res; + uint64_t dummy; + if (expiredAt == NULL) expiredAt = &dummy; + if (o->encoding == OBJ_ENCODING_LISTPACK || o->encoding == OBJ_ENCODING_LISTPACK_EX) { *vstr = NULL; - res = hashTypeGetFromListpack(o, field, vstr, vlen, vll, &expiredAt); + res = hashTypeGetFromListpack(o, field, vstr, vlen, vll, expiredAt); if (res == GETF_NOT_FOUND) return GETF_NOT_FOUND; } else if (o->encoding == OBJ_ENCODING_HT) { sds value = NULL; - res = hashTypeGetFromHashTable(o, field, &value, &expiredAt); + res = hashTypeGetFromHashTable(o, field, &value, expiredAt); if (res == GETF_NOT_FOUND) return GETF_NOT_FOUND; @@ -734,7 +739,7 @@ GetFieldRes hashTypeGetValue(redisDb *db, robj *o, sds field, unsigned char **vs serverPanic("Unknown hash encoding"); } - if (expiredAt >= (uint64_t) commandTimeSnapshot()) + if (*expiredAt >= (uint64_t) commandTimeSnapshot()) return GETF_OK; if (server.masterhost) { @@ -794,7 +799,7 @@ robj *hashTypeGetValueObject(redisDb *db, robj *o, sds field, int hfeFlags, int long long vll; if (isHashDeleted) *isHashDeleted = 0; - GetFieldRes res = hashTypeGetValue(db,o,field,&vstr,&vlen,&vll, hfeFlags); + GetFieldRes res = hashTypeGetValue(db,o,field,&vstr,&vlen,&vll, hfeFlags, NULL); if (res == GETF_OK) { if (vstr) return createStringObject((char*)vstr,vlen); @@ -823,7 +828,7 @@ int hashTypeExists(redisDb *db, robj *o, sds field, int hfeFlags, int *isHashDel unsigned int vlen = UINT_MAX; long long vll = LLONG_MAX; - GetFieldRes res = hashTypeGetValue(db, o, field, &vstr, &vlen, &vll, hfeFlags); + GetFieldRes res = hashTypeGetValue(db, o, field, &vstr, &vlen, &vll, hfeFlags, NULL); if (isHashDeleted) *isHashDeleted = (res == GETF_EXPIRED_HASH) ? 1 : 0; return (res == GETF_OK) ? 1 : 0; @@ -2195,7 +2200,7 @@ void hincrbyCommand(client *c) { if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return; GetFieldRes res = hashTypeGetValue(c->db,o,c->argv[2]->ptr,&vstr,&vlen,&value, - HFE_LAZY_EXPIRE); + HFE_LAZY_EXPIRE, NULL); if (res == GETF_OK) { if (vstr) { if (string2ll((char*)vstr,vlen,&value) == 0) { @@ -2234,6 +2239,8 @@ void hincrbyfloatCommand(client *c) { sds new; unsigned char *vstr; unsigned int vlen; + int has_expiration = 0; + uint64_t expireat = EB_EXPIRE_TIME_INVALID; if (getLongDoubleFromObjectOrReply(c,c->argv[3],&incr,NULL) != C_OK) return; if (isnan(incr) || isinf(incr)) { @@ -2242,7 +2249,7 @@ void hincrbyfloatCommand(client *c) { } if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return; GetFieldRes res = hashTypeGetValue(c->db, o,c->argv[2]->ptr,&vstr,&vlen,&ll, - HFE_LAZY_EXPIRE); + HFE_LAZY_EXPIRE, &expireat); if (res == GETF_OK) { if (vstr) { if (string2ld((char*)vstr,vlen,&value) == 0) { @@ -2252,6 +2259,8 @@ void hincrbyfloatCommand(client *c) { } else { value = (long double)ll; } + /* Field has expiration time. */ + if (expireat != EB_EXPIRE_TIME_INVALID) has_expiration = 1; } else if ((res == GETF_NOT_FOUND) || (res == GETF_EXPIRED)) { value = 0; } else { @@ -2284,6 +2293,25 @@ void hincrbyfloatCommand(client *c) { rewriteClientCommandArgument(c,0,shared.hset); rewriteClientCommandArgument(c,3,newobj); decrRefCount(newobj); + + if (has_expiration) { + /* To make sure that the HSET command is propagated before the HPEXPIREAT, + * we need to prevent the HSET command from being propagated, and then + * propagate both commands manually in the correct order. */ + preventCommandPropagation(c); + /* Propagate HSET */ + alsoPropagate(c->db->id, c->argv, c->argc, PROPAGATE_AOF|PROPAGATE_REPL); + /* Propagate HPEXPIREAT */ + robj *argv[6]; + argv[0] = shared.hpexpireat; + argv[1] = c->argv[1]; + argv[2] = createStringObjectFromLongLong(expireat); + argv[3] = shared.fields; + argv[4] = shared.integers[1]; + argv[5] = c->argv[2]; + alsoPropagate(c->db->id, argv, 6, PROPAGATE_AOF|PROPAGATE_REPL); + decrRefCount(argv[2]); + } } static GetFieldRes addHashFieldToReply(client *c, robj *o, sds field, int hfeFlags) { @@ -2296,7 +2324,7 @@ static GetFieldRes addHashFieldToReply(client *c, robj *o, sds field, int hfeFla unsigned int vlen = UINT_MAX; long long vll = LLONG_MAX; - GetFieldRes res = hashTypeGetValue(c->db, o, field, &vstr, &vlen, &vll, hfeFlags); + GetFieldRes res = hashTypeGetValue(c->db, o, field, &vstr, &vlen, &vll, hfeFlags, NULL); if (res == GETF_OK) { if (vstr) { addReplyBulkCBuffer(c, vstr, vlen); @@ -2408,7 +2436,7 @@ void hstrlenCommand(client *c) { checkType(c,o,OBJ_HASH)) return; GetFieldRes res = hashTypeGetValue(c->db, o, c->argv[2]->ptr, &vstr, &vlen, &vll, - HFE_LAZY_EXPIRE); + HFE_LAZY_EXPIRE, NULL); if (res == GETF_NOT_FOUND || res == GETF_EXPIRED || res == GETF_EXPIRED_HASH) { addReply(c, shared.czero); diff --git a/tests/unit/type/hash-field-expire.tcl b/tests/unit/type/hash-field-expire.tcl index 630d141e5..bcfabdb9e 100644 --- a/tests/unit/type/hash-field-expire.tcl +++ b/tests/unit/type/hash-field-expire.tcl @@ -1266,5 +1266,57 @@ start_server {tags {"external:skip needs:debug"}} { assert_equal [dumpAllHashes $primary] [dumpAllHashes $replica] } } + + test "HINCRBYFLOAT command won't remove field expiration on replica ($type)" { + r flushall + set repl [attach_to_replication_stream] + + r hset h1 f1 1 + r hset h1 f2 1 + r hexpire h1 100 FIELDS 1 f1 + r hincrbyfloat h1 f1 1.1 + r hincrbyfloat h1 f2 1.1 + + # HINCRBYFLOAT will be replicated as HSET if no expiration time is set. + # Otherwise it will be replicated as HSET+HPEXPIREAT multi command. + assert_replication_stream $repl { + {select *} + {hset h1 f1 1} + {hset h1 f2 1} + {hpexpireat h1 * FIELDS 1 f1} + {multi} + {hset h1 f1 *} + {hpexpireat h1 * FIELDS 1 f1} + {exec} + {hset h1 f2 *} + } + close_replication_stream $repl + + start_server {tags {external:skip}} { + r -1 flushall + r slaveof [srv -1 host] [srv -1 port] + wait_for_sync r + + r -1 hset h1 f1 1 + r -1 hset h1 f2 1 + r -1 hexpire h1 100 FIELDS 1 f1 + wait_for_ofs_sync [srv -1 client] [srv 0 client] + assert_range [r httl h1 FIELDS 1 f1] 90 100 + assert_equal {-1} [r httl h1 FIELDS 1 f2] + + r -1 hincrbyfloat h1 f1 1.1 + r -1 hincrbyfloat h1 f2 1.1 + + # Expiration time should not be removed on replica and the value + # should be equal to the master. + wait_for_ofs_sync [srv -1 client] [srv 0 client] + assert_range [r httl h1 FIELDS 1 f1] 90 100 + assert_equal [r -1 hget h1 f1] [r hget h1 f1] + + # The field f2 should not have any expiration time on replica either. + assert_equal {-1} [r httl h1 FIELDS 1 f2] + assert_equal [r -1 hget h1 f2] [r hget h1 f2] + } + } {} {needs:repl external:skip} } }