This commit is contained in:
sggeorgiev 2025-10-08 11:16:25 +00:00 committed by GitHub
commit d9b5e6a7a2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 1998 additions and 28 deletions

View File

@ -753,6 +753,9 @@ void blockedBeforeSleep(void) {
/* Handle precise timeouts of blocked clients. */
handleBlockedClientsTimeout();
/* Handle for expired pending entries. */
handleClaimableStreamEntries();
/* Unblock all the clients blocked for synchronous replication
* in WAIT or WAITAOF. */
if (listLength(server.clients_waiting_acks))

View File

@ -10434,6 +10434,7 @@ struct COMMAND_ARG XREADGROUP_Args[] = {
{MAKE_ARG("group-block",ARG_TYPE_BLOCK,-1,"GROUP",NULL,NULL,CMD_ARG_NONE,2,NULL),.subargs=XREADGROUP_group_block_Subargs},
{MAKE_ARG("count",ARG_TYPE_INTEGER,-1,"COUNT",NULL,NULL,CMD_ARG_OPTIONAL,0,NULL)},
{MAKE_ARG("milliseconds",ARG_TYPE_INTEGER,-1,"BLOCK",NULL,NULL,CMD_ARG_OPTIONAL,0,NULL)},
{MAKE_ARG("min-idle-time",ARG_TYPE_INTEGER,-1,"CLAIM",NULL,NULL,CMD_ARG_OPTIONAL,0,NULL)},
{MAKE_ARG("noack",ARG_TYPE_PURE_TOKEN,-1,"NOACK",NULL,NULL,CMD_ARG_OPTIONAL,0,NULL)},
{MAKE_ARG("streams",ARG_TYPE_BLOCK,-1,"STREAMS",NULL,NULL,CMD_ARG_NONE,2,NULL),.subargs=XREADGROUP_streams_Subargs},
};
@ -11488,7 +11489,7 @@ struct COMMAND_STRUCT redisCommandTable[] = {
{MAKE_CMD("xpending","Returns the information and entries from a stream consumer group's pending entries list.","O(N) with N being the number of elements returned, so asking for a small fixed number of entries per call is O(1). O(M), where M is the total number of entries scanned when used with the IDLE filter. When the command returns just the summary and the list of consumers is small, it runs in O(1) time; otherwise, an additional O(N) time for iterating every consumer.","5.0.0",CMD_DOC_NONE,NULL,NULL,"stream",COMMAND_GROUP_STREAM,XPENDING_History,1,XPENDING_Tips,1,xpendingCommand,-3,CMD_READONLY,ACL_CATEGORY_STREAM,XPENDING_Keyspecs,1,NULL,3),.args=XPENDING_Args},
{MAKE_CMD("xrange","Returns the messages from a stream within a range of IDs.","O(N) with N being the number of elements being returned. If N is constant (e.g. always asking for the first 10 elements with COUNT), you can consider it O(1).","5.0.0",CMD_DOC_NONE,NULL,NULL,"stream",COMMAND_GROUP_STREAM,XRANGE_History,1,XRANGE_Tips,0,xrangeCommand,-4,CMD_READONLY,ACL_CATEGORY_STREAM,XRANGE_Keyspecs,1,NULL,4),.args=XRANGE_Args},
{MAKE_CMD("xread","Returns messages from multiple streams with IDs greater than the ones requested. Blocks until a message is available otherwise.",NULL,"5.0.0",CMD_DOC_NONE,NULL,NULL,"stream",COMMAND_GROUP_STREAM,XREAD_History,0,XREAD_Tips,0,xreadCommand,-4,CMD_BLOCKING|CMD_READONLY,ACL_CATEGORY_STREAM,XREAD_Keyspecs,1,xreadGetKeys,3),.args=XREAD_Args},
{MAKE_CMD("xreadgroup","Returns new or historical messages from a stream for a consumer in a group. Blocks until a message is available otherwise.","For each stream mentioned: O(M) with M being the number of elements returned. If M is constant (e.g. always asking for the first 10 elements with COUNT), you can consider it O(1). On the other side when XREADGROUP blocks, XADD will pay the O(N) time in order to serve the N clients blocked on the stream getting new data.","5.0.0",CMD_DOC_NONE,NULL,NULL,"stream",COMMAND_GROUP_STREAM,XREADGROUP_History,0,XREADGROUP_Tips,0,xreadCommand,-7,CMD_BLOCKING|CMD_WRITE,ACL_CATEGORY_STREAM,XREADGROUP_Keyspecs,1,xreadGetKeys,5),.args=XREADGROUP_Args},
{MAKE_CMD("xreadgroup","Returns new or historical messages from a stream for a consumer in a group. Blocks until a message is available otherwise.","For each stream mentioned: O(M) with M being the number of elements returned. If M is constant (e.g. always asking for the first 10 elements with COUNT), you can consider it O(1). On the other side when XREADGROUP blocks, XADD will pay the O(N) time in order to serve the N clients blocked on the stream getting new data.","5.0.0",CMD_DOC_NONE,NULL,NULL,"stream",COMMAND_GROUP_STREAM,XREADGROUP_History,0,XREADGROUP_Tips,0,xreadCommand,-7,CMD_BLOCKING|CMD_WRITE,ACL_CATEGORY_STREAM,XREADGROUP_Keyspecs,1,xreadGetKeys,6),.args=XREADGROUP_Args},
{MAKE_CMD("xrevrange","Returns the messages from a stream within a range of IDs in reverse order.","O(N) with N being the number of elements returned. If N is constant (e.g. always asking for the first 10 elements with COUNT), you can consider it O(1).","5.0.0",CMD_DOC_NONE,NULL,NULL,"stream",COMMAND_GROUP_STREAM,XREVRANGE_History,1,XREVRANGE_Tips,0,xrevrangeCommand,-4,CMD_READONLY,ACL_CATEGORY_STREAM,XREVRANGE_Keyspecs,1,NULL,4),.args=XREVRANGE_Args},
{MAKE_CMD("xsetid","An internal command for replicating stream values.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,"stream",COMMAND_GROUP_STREAM,XSETID_History,1,XSETID_Tips,0,xsetidCommand,-3,CMD_WRITE|CMD_DENYOOM|CMD_FAST,ACL_CATEGORY_STREAM,XSETID_Keyspecs,1,NULL,4),.args=XSETID_Args},
{MAKE_CMD("xtrim","Deletes messages from the beginning of a stream.","O(N), with N being the number of evicted entries. Constant times are very small however, since entries are organized in macro nodes containing multiple entries that can be released with a single deallocation.","5.0.0",CMD_DOC_NONE,NULL,NULL,"stream",COMMAND_GROUP_STREAM,XTRIM_History,2,XTRIM_Tips,1,xtrimCommand,-4,CMD_WRITE,ACL_CATEGORY_STREAM,XTRIM_Keyspecs,1,NULL,2),.args=XTRIM_Args},

View File

@ -63,6 +63,12 @@
"type": "integer",
"optional": true
},
{
"token": "CLAIM",
"name": "min-idle-time",
"type": "integer",
"optional": true
},
{
"name": "noack",
"token": "NOACK",

View File

@ -870,6 +870,8 @@ typedef struct {
} PendingEntryContext;
void* defragStreamConsumerPendingEntry(raxIterator *ri, void *privdata) {
uint64_t keyBuf[3];
pelTimeKey timeKey;
PendingEntryContext *ctx = privdata;
streamNACK *nack = ri->data, *newnack;
nack->consumer = ctx->c; /* update nack pointer to consumer */
@ -877,8 +879,22 @@ void* defragStreamConsumerPendingEntry(raxIterator *ri, void *privdata) {
newnack = activeDefragAlloc(nack);
if (newnack) {
/* update consumer group pointer to the nack */
streamID id;
streamDecodeID(ri->key, &id);
void *prev;
raxInsert(ctx->cg->pel, ri->key, ri->key_len, newnack, &prev);
if (prev) {
streamNACK *prevNack = prev;
timeKey.delivery_time = prevNack->delivery_time;
timeKey.id = id;
encodePelTimeKey(&keyBuf, &timeKey);
raxRemove(ctx->cg->pel_by_time, (unsigned char*)&keyBuf, sizeof(keyBuf), NULL);
}
timeKey.delivery_time = newnack->delivery_time;
timeKey.id = id;
encodePelTimeKey(&keyBuf, &timeKey);
raxInsert(ctx->cg->pel_by_time, (unsigned char*)&keyBuf, sizeof(keyBuf), NULL, NULL);
serverAssert(prev==nack);
}
return newnack;

View File

@ -2988,6 +2988,16 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error)
streamFreeNACK(nack);
return NULL;
}
streamID id;
streamDecodeID(rawid, &id);
uint64_t keyBuf[3];
pelTimeKey timeKey;
timeKey.delivery_time = nack->delivery_time;
timeKey.id = id;
encodePelTimeKey(&keyBuf, &timeKey);
raxInsert(cgroup->pel_by_time, (unsigned char*)&keyBuf, sizeof(keyBuf), NULL, NULL);
}
/* Now that we loaded our global PEL, we need to load the

View File

@ -2857,6 +2857,7 @@ void initServer(void) {
server.db[j].expires_cursor = 0;
server.db[j].blocking_keys = dictCreate(&keylistDictType);
server.db[j].blocking_keys_unblock_on_nokey = dictCreate(&objectKeyPointerValueDictType);
server.db[j].stream_claim_pending_keys = dictCreate(&objectKeyPointerValueDictType);
server.db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType);
server.db[j].watched_keys = dictCreate(&keylistDictType);
server.db[j].id = j;

View File

@ -1118,6 +1118,7 @@ typedef struct redisDb {
dict *blocking_keys_unblock_on_nokey; /* Keys with clients waiting for
* data, and should be unblocked if key is deleted (XREADEDGROUP).
* This is a subset of blocking_keys*/
dict *stream_claim_pending_keys; /* Keys with clients waiting to claim pending entries */
dict *ready_keys; /* Blocked keys that received a PUSH */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
int id; /* Database ID */
@ -3842,6 +3843,9 @@ void removeClientFromTimeoutTable(client *c);
void handleBlockedClientsTimeout(void);
int clientsCronHandleTimeout(client *c, mstime_t now_ms);
/* t_stream.c -- Handling of stream data structures */
void handleClaimableStreamEntries(void);
/* expire.c -- Handling of expired keys */
void activeExpireCycle(int type);
void expireSlaveKeys(void);

View File

@ -70,6 +70,12 @@ typedef struct streamCG {
as processed. The key of the radix tree is the
ID as a 64 bit big endian number, while the
associated value is a streamNACK structure.*/
rax *pel_by_time; /* A radix tree mapping delivery time to pending
entries, so that we can query faster PEL entries
by time. The key is the delivery time in milliseconds
as a 64 bit big endian number. The value is pointer to
the streamNACK structure associated with the pending
entry. */
rax *consumers; /* A radix tree representing the consumers by name
and their associated representation in the form
of streamConsumer structures. */
@ -107,6 +113,12 @@ typedef struct streamPropInfo {
robj *groupname;
} streamPropInfo;
/* Pending entry in the consumer group's PEL, indexed by delivery time. */
typedef struct pelTimeKey {
uint64_t delivery_time;
streamID id;
} pelTimeKey;
/* Prototypes of exported APIs. */
struct client;
@ -120,7 +132,7 @@ struct client;
stream *streamNew(void);
void freeStream(stream *s);
unsigned long streamLength(const robj *subject);
size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi, unsigned long *propCount);
size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, long long min_idle_time, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi, unsigned long *propCount);
void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev);
int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields);
void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen);
@ -150,4 +162,7 @@ int64_t streamTrimByID(stream *s, streamID minid, int approx);
listNode *streamLinkCGroupToEntry(stream *s, streamCG *cg, unsigned char *key);
void encodePelTimeKey(void* buf, pelTimeKey *timeKey);
void decodePelTimeKey(void *buf, pelTimeKey *timeKey);
#endif

View File

@ -35,13 +35,14 @@
void streamFreeCGGeneric(void *cg);
void streamFreeNACK(streamNACK *na);
size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer);
size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamCG *group, streamConsumer *consumer);
int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq, int *seq_given);
int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq);
int streamEntryIsReferenced(stream *s, streamID *id);
void streamCleanupEntryCGroupRefs(stream *s, streamID *id);
void streamUpdateCGroupLastId(stream *s, streamCG *cg, streamID *id);
void trackStreamClaimTimeouts(client *c, robj **keys, int numkeys, uint64_t expire_time);
/* -----------------------------------------------------------------------
* Low level stream encoding: a radix tree of listpacks.
@ -210,6 +211,16 @@ robj *streamDup(robj *o) {
new_nack->delivery_count = nack->delivery_count;
new_nack->cgroup_ref_node = streamLinkCGroupToEntry(new_s, new_cg, ri_cg_pel.key);
raxInsert(new_cg->pel, ri_cg_pel.key, sizeof(streamID), new_nack, NULL);
streamID id;
streamDecodeID(ri_cg_pel.key, &id);
uint64_t keyBuf[3];
pelTimeKey timeKey;
timeKey.delivery_time = new_nack->delivery_time;
timeKey.id = id;
encodePelTimeKey(keyBuf, &timeKey);
raxInsert(new_cg->pel_by_time, (unsigned char*)&keyBuf, sizeof(keyBuf), NULL, NULL);
}
raxStop(&ri_cg_pel);
@ -1747,6 +1758,13 @@ void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds
*
* The function returns the number of entries emitted.
*
* If 'min_idle_time' is not -1 and a group is specified, the function first
* processes pending entries (from the group's PEL) that have been idle for at
* least 'min_idle_time' milliseconds, claiming them for the specified consumer.
* Each claimed entry is returned as a four-element array: ID, field-value pairs,
* idle time, and delivery count. The NACK is transferred from the previous
* consumer to the new consumer with updated delivery metadata.
*
* If group and consumer are not NULL, the function performs additional work:
* 1. It updates the last delivered ID in the group in case we are
* sending IDs greater than the current last ID.
@ -1767,6 +1785,9 @@ void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds
* and return the number of entries emitted as usually.
* This is used when the function is just used in order
* to emit data and there is some higher level logic.
* STREAM_RWR_HISTORY: Return entries from the consumer's own PEL history only.
* STREAM_RWR_CLAIMED: Return only claimable entries from the PEL. New entries
* from the stream are not returned.
*
* The final argument 'spi' (stream propagation info pointer) is a structure
* filled with information needed to propagate the command execution to AOF
@ -1783,23 +1804,148 @@ void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds
* consumer pending entries list. However such a function will then call
* streamReplyWithRange() in order to emit single entries (found in the
* PEL by ID) to the client. This is the use case for the STREAM_RWR_RAWENTRIES
* flag.
*/
* flag. */
#define STREAM_RWR_NOACK (1<<0) /* Do not create entries in the PEL. */
#define STREAM_RWR_RAWENTRIES (1<<1) /* Do not emit protocol for array
boundaries, just the entries. */
#define STREAM_RWR_HISTORY (1<<2) /* Only serve consumer local PEL. */
size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi, unsigned long *propCount) {
#define STREAM_RWR_CLAIMED (1<<3) /* Only serve claimed entries from PEL. */
size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, long long min_idle_time, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi, unsigned long *propCount) {
void *arraylen_ptr = NULL;
size_t arraylen = 0;
streamIterator si;
int64_t numfields;
streamID id;
uint64_t keyBuf[3];
pelTimeKey timeKey;
int propagate_last_id = 0;
int noack = flags & STREAM_RWR_NOACK;
if (propCount) *propCount = 0;
if (group && min_idle_time != -1) {
arraylen_ptr = addReplyDeferredLen(c);
/* Scan the group's pending entries list (PEL) to find messages that have been
* idle for at least min_idle_time milliseconds. The pel_by_time radix tree
* stores entries ordered by their last delivery timestamp, allowing us to
* efficiently iterate from oldest to newest.
*
* We collect eligible entries into a temporary list rather than processing
* them inline because:
* 1. We cannot safely modify a radix tree while iterating over it
* 2. The claiming process requires removing and re-inserting entries in
* both pel_by_time and the consumer PELs
*
* The iteration can terminate early in two cases:
* 1. We find an entry that hasn't been idle long enough - due to time-based
* ordering, all subsequent entries will be even newer
* 2. We've collected enough entries to satisfy the requested count limit */
list *eligible_pels = listCreate();
raxIterator ri;
raxStart(&ri, group->pel_by_time);
raxSeek(&ri, "^", NULL, 0);
while (raxNext(&ri)) {
pelTimeKey pelKey;
decodePelTimeKey(ri.key, &pelKey);
uint64_t idle = commandTimeSnapshot() - pelKey.delivery_time;
if (idle < (uint64_t)min_idle_time)
break;
/* Store a copy of the key for later processing */
pelTimeKey *keyCopy = zmalloc(sizeof(pelTimeKey));
memcpy(keyCopy, &pelKey, sizeof(pelTimeKey));
listAddNodeTail(eligible_pels, keyCopy);
if (count && listLength(eligible_pels) >= count) break;
}
raxStop(&ri);
/* Process each eligible pending entry, claiming it for the current consumer.
* For each entry we:
* 1. Fetch the actual message data from the stream
* 2. Send the message to the client with metadata (idle time, delivery count)
* 3. Transfer ownership from the previous consumer to the current consumer
* 4. Update all relevant data structures and propagate the claim operation */
listIter li;
listNode *ln;
listRewind(eligible_pels, &li);
while ((ln = listNext(&li))) {
pelTimeKey *pelKey = (pelTimeKey*)listNodeValue(ln);
unsigned char buf[sizeof(streamID)];
streamEncodeID(buf, &pelKey->id);
void *result;
streamNACK *nack = NULL;
uint64_t delivery_count = 0;
/* Must exist, we got the ID from pel_by_time */
serverAssert(raxFind(group->pel,buf,sizeof(buf),&result));
nack = (streamNACK*)result;
delivery_count = nack->delivery_count;
streamID pel_id;
streamIteratorStart(&si,s,&pelKey->id,&pelKey->id,rev);
if (streamIteratorGetID(&si,&pel_id,&numfields)) {
/* Emit a four elements array: ID, array of field-value pairs,
* idle time and delivery count. */
addReplyArrayLen(c,4);
addReplyStreamID(c,&pel_id);
addReplyArrayLen(c,numfields*2);
/* Emit the field-value pairs. */
while (numfields--) {
unsigned char *key, *value;
int64_t key_len, value_len;
streamIteratorGetField(&si,&key,&value,&key_len,&value_len);
addReplyBulkCBuffer(c,key,key_len);
addReplyBulkCBuffer(c,value,value_len);
}
uint64_t idle = commandTimeSnapshot() - pelKey->delivery_time;
addReplyBulkLongLong(c, idle);
addReplyBulkLongLong(c, delivery_count);
/* Remove the NACK from old consumer.*/
raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
/* Remove the NACK from the PEL by time. */
timeKey.delivery_time = nack->delivery_time;
timeKey.id = pel_id;
encodePelTimeKey(&keyBuf, &timeKey);
raxRemove(group->pel_by_time, (unsigned char*)&keyBuf, sizeof(keyBuf), NULL);
/* Update the consumer and NACK metadata. */
nack->consumer = consumer;
nack->delivery_time = commandTimeSnapshot();
nack->delivery_count++;
/* Add the entry in the new consumer local PEL. */
raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
/* Add updated NACK in PEL by time. */
timeKey.delivery_time = nack->delivery_time;
timeKey.id = pel_id;
encodePelTimeKey(&keyBuf, &timeKey);
raxInsert(group->pel_by_time, (unsigned char*)&keyBuf, sizeof(keyBuf), NULL, NULL);
consumer->active_time = commandTimeSnapshot();
/* Propagate as XCLAIM. */
if (spi) {
robj *idarg = createObjectFromStreamID(&pel_id);
streamPropagateXCLAIM(c,spi->keyname,group,spi->groupname,idarg,nack);
decrRefCount(idarg);
if (propCount) (*propCount)++;
}
arraylen++;
if (count && count == arraylen) break;
}
streamIteratorStop(&si);
}
listRewind(eligible_pels, &li);
while ((ln = listNext(&li))) {
zfree(listNodeValue(ln));
}
listRelease(eligible_pels);
}
/* If the client is asking for some history, we serve it using a
* different function, so that we return entries *solely* from its
* own PEL. This ensures each consumer will always and only see
@ -1807,13 +1953,19 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
* as delivered. */
if (group && (flags & STREAM_RWR_HISTORY)) {
return streamReplyWithRangeFromConsumerPEL(c,s,start,end,count,
consumer);
group, consumer);
}
if (!(flags & STREAM_RWR_RAWENTRIES))
/* Stop here if client only wants claimed entries or count is satisfied. */
if ((group && (flags & STREAM_RWR_CLAIMED)) || (count && count == arraylen)) {
if (arraylen_ptr) setDeferredArrayLen(c,arraylen_ptr,arraylen);
return arraylen;
}
if (!(flags & STREAM_RWR_RAWENTRIES) && !arraylen_ptr)
arraylen_ptr = addReplyDeferredLen(c);
streamIteratorStart(&si,s,start,end,rev);
while(streamIteratorGetID(&si,&id,&numfields)) {
while (streamIteratorGetID(&si,&id,&numfields)) {
/* Update the group last_id if needed. */
if (group && streamCompareID(&id,&group->last_id) > 0) {
if (group->entries_read != SCG_INVALID_ENTRIES_READ &&
@ -1836,15 +1988,20 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
propagate_last_id = 1;
}
/* Emit a two elements array for each item. The first is
* the ID, the second is an array of field-value pairs. */
addReplyArrayLen(c,2);
if (min_idle_time != -1) {
/* If min-idle-time is specified, we emit a four elements
* array: ID, array of field-value pairs, idle time and delivery count. */
addReplyArrayLen(c,4);
} else {
/* Emit a two elements array for each item. The first is
* the ID, the second is an array of field-value pairs. */
addReplyArrayLen(c,2);
}
addReplyStreamID(c,&id);
addReplyArrayLen(c,numfields*2);
/* Emit the field-value pairs. */
while(numfields--) {
while (numfields--) {
unsigned char *key, *value;
int64_t key_len, value_len;
streamIteratorGetField(&si,&key,&value,&key_len,&value_len);
@ -1852,6 +2009,13 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
addReplyBulkCBuffer(c,value,value_len);
}
if (min_idle_time != -1) {
/* For new entries idle time and delivery count is 0. */
addReplyBulkLongLong(c, 0);
addReplyBulkLongLong(c, 0);
}
/* If a group is passed, we need to create an entry in the
* PEL (pending entries list) of this group *and* this consumer.
*
@ -1884,6 +2048,11 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
serverAssert(found);
nack = result;
raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
/* Remove old entry from the PEL by time. */
timeKey.delivery_time = nack->delivery_time;
timeKey.id = id;
encodePelTimeKey(&keyBuf, &timeKey);
raxRemove(group->pel_by_time, (unsigned char*)&keyBuf, sizeof(keyBuf), NULL);
/* Update the consumer and NACK metadata. */
nack->consumer = consumer;
nack->delivery_time = commandTimeSnapshot();
@ -1896,6 +2065,12 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
serverPanic("NACK half-created. Should not be possible.");
}
/* We have new NACK or updated existing one. */
timeKey.delivery_time = nack->delivery_time;
timeKey.id = id;
encodePelTimeKey(&keyBuf, &timeKey);
raxInsert(group->pel_by_time, (unsigned char*)&keyBuf, sizeof(keyBuf), NULL, NULL);
consumer->active_time = commandTimeSnapshot();
/* Propagate as XCLAIM. */
@ -1934,7 +2109,7 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
* seek into the radix tree of the messages in order to emit the full message
* to the client. However clients only reach this code path when they are
* fetching the history of already retrieved messages, which is rare. */
size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer) {
size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamCG *group, streamConsumer *consumer) {
raxIterator ri;
unsigned char startkey[sizeof(streamID)];
unsigned char endkey[sizeof(streamID)];
@ -1949,7 +2124,7 @@ size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start
if (end && memcmp(ri.key,end,ri.key_len) > 0) break;
streamID thisid;
streamDecodeID(ri.key,&thisid);
if (streamReplyWithRange(c,s,&thisid,&thisid,1,0,NULL,NULL,
if (streamReplyWithRange(c,s,&thisid,&thisid,1,0,-1,NULL,NULL,
STREAM_RWR_RAWENTRIES,NULL,NULL) == 0)
{
/* Note that we may have a not acknowledged entry in the PEL
@ -1961,8 +2136,19 @@ size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start
addReplyNullArray(c);
} else {
streamNACK *nack = ri.data;
uint64_t keyBuf[3];
pelTimeKey timeKey;
timeKey.delivery_time = nack->delivery_time;
timeKey.id = thisid;
encodePelTimeKey(&keyBuf, &timeKey);
raxRemove(group->pel_by_time, (unsigned char*)&keyBuf, sizeof(keyBuf), NULL);
nack->delivery_time = commandTimeSnapshot();
nack->delivery_count++;
timeKey.delivery_time = nack->delivery_time;
encodePelTimeKey(&keyBuf, &timeKey);
raxInsert(group->pel_by_time, (unsigned char*)&keyBuf, sizeof(keyBuf), NULL, NULL);
}
arraylen++;
}
@ -2267,7 +2453,7 @@ void xrangeGenericCommand(client *c, int rev) {
addReplyNullArray(c);
} else {
if (count == -1) count = 0;
streamReplyWithRange(c,s,&startid,&endid,count,rev,NULL,NULL,0,NULL,NULL);
streamReplyWithRange(c,s,&startid,&endid,count,rev,-1,NULL,NULL,0,NULL,NULL);
}
}
@ -2299,6 +2485,7 @@ void xlenCommand(client *c) {
* on slaves, XREADGROUP is not. */
#define XREAD_BLOCKED_DEFAULT_COUNT 1000
void xreadCommand(client *c) {
long long min_idle_time = -1; /* -1 means, no IDLE argument given. */
long long timeout = -1; /* -1 means, no BLOCK argument given. */
long long count = 0;
int streams_count = 0;
@ -2315,7 +2502,22 @@ void xreadCommand(client *c) {
for (int i = 1; i < c->argc; i++) {
int moreargs = c->argc-i-1;
char *o = c->argv[i]->ptr;
if (!strcasecmp(o,"BLOCK") && moreargs) {
if (!strcasecmp(o,"CLAIM") && moreargs) {
if (!xreadgroup) {
addReplyError(c,"The CLAIM option is only supported by "
"XREADGROUP. You called XREAD instead.");
return;
}
i++;
min_idle_time = -1;
if (getLongLongFromObjectOrReply(c, c->argv[i], &min_idle_time,
"min-idle-time is not an integer or out of range") != C_OK)
return;
if (min_idle_time < 0) {
addReplyError(c,"min-idle-time must be a positive integer");
return;
}
} else if (!strcasecmp(o,"BLOCK") && moreargs) {
i++;
if (getTimeoutFromObjectOrReply(c,c->argv[i],&timeout,
UNIT_MILLISECONDS) != C_OK) return;
@ -2460,11 +2662,13 @@ void xreadCommand(client *c) {
/* Try to serve the client synchronously. */
size_t arraylen = 0;
void *arraylen_ptr = NULL;
uint64_t min_pel_delivery_time = UINT64_MAX;
for (int i = 0; i < streams_count; i++) {
kvobj *o = lookupKeyRead(c->db, c->argv[streams_arg + i]);
if (o == NULL) continue;
stream *s = o->ptr;
streamID *gt = ids+i; /* ID must be greater than this. */
int serve_claimed = 0;
int serve_synchronously = 0;
int serve_history = 0; /* True for XREADGROUP with ID != ">". */
streamConsumer *consumer = NULL; /* Unused if XREAD */
@ -2473,6 +2677,33 @@ void xreadCommand(client *c) {
/* Check if there are the conditions to serve the client
* synchronously. */
if (groups) {
/* If min_idle_time is set we need to check is there any pending
* message in the PEL idle enough to be claimed. Also we need to
* get the minimum delivery time in the PEL, in order to use it
* later if block option is set. */
if (min_idle_time != -1) {
raxIterator ri;
raxStart(&ri, groups[i]->pel_by_time);
raxSeek(&ri, "^", NULL, 0);
while(raxNext(&ri)) {
pelTimeKey timeKey;
decodePelTimeKey(ri.key, &timeKey);
if (!streamEntryExists(s, &timeKey.id))
continue;
if (timeKey.delivery_time < min_pel_delivery_time) {
min_pel_delivery_time = timeKey.delivery_time;
}
uint64_t idle = commandTimeSnapshot() - timeKey.delivery_time;
if (idle >= (uint64_t)min_idle_time) {
serve_claimed = 1;
}
break;
}
raxStop(&ri);
}
/* If the consumer is blocked on a group, we always serve it
* synchronously (serving its local history) if the ID specified
* was not the special ">" ID. */
@ -2513,6 +2744,18 @@ void xreadCommand(client *c) {
}
}
int flags = 0;
if (serve_history) {
/* CLAIM option is ignored when we server from consumer history.*/
min_idle_time = -1;
} else if (!serve_synchronously && serve_claimed) {
/* We serve the client synchronously if the CLAIM option was
* specified and there are messages in the PEL that are idle
* enough. */
serve_synchronously = 1;
flags |= STREAM_RWR_CLAIMED;
}
if (serve_synchronously) {
arraylen++;
if (arraylen == 1) arraylen_ptr = addReplyDeferredLen(c);
@ -2527,11 +2770,10 @@ void xreadCommand(client *c) {
if (c->resp == 2) addReplyArrayLen(c,2);
addReplyBulk(c,c->argv[streams_arg+i]);
int flags = 0;
unsigned long propCount = 0;
if (noack) flags |= STREAM_RWR_NOACK;
if (serve_history) flags |= STREAM_RWR_HISTORY;
streamReplyWithRange(c,s,&start,NULL,count,0,
streamReplyWithRange(c,s,&start,NULL,count,0, min_idle_time,
groups ? groups[i] : NULL,
consumer, flags, &spi, &propCount);
if (propCount) server.dirty++;
@ -2567,6 +2809,19 @@ void xreadCommand(client *c) {
decrRefCount(argv_streamid);
}
}
/* If min_idle_time is set we need to unblock client if PEL entry became claimable
* before new messages arrive. min_pel_delivery_time is the minimum delivery time of all
* entries in the PELs of different streams specified in the command. We add it to
* min_idle_time to get the earliest time when an entry will be eligible for claiming.
* If there are no entries in the PELs we will unblock the client after min_idle_time. */
if (min_idle_time != -1) {
uint64_t pel_expire_time = min_idle_time;
if (min_pel_delivery_time != UINT64_MAX)
pel_expire_time += min_pel_delivery_time;
else
pel_expire_time += commandTimeSnapshot();
trackStreamClaimTimeouts(c, c->argv+streams_arg, streams_count, pel_expire_time);
}
blockForKeys(c, BLOCKED_STREAM, c->argv+streams_arg, streams_count, timeout, xreadgroup);
goto cleanup;
}
@ -2663,10 +2918,16 @@ void streamCleanupEntryCGroupRefs(stream *s, streamID *id) {
/* Remove from group and consumer PELs */
raxRemove(group->pel, buf, sizeof(buf), NULL);
uint64_t keyBuf[3];
pelTimeKey timeKey;
timeKey.delivery_time = nack->delivery_time;
timeKey.id = *id;
encodePelTimeKey(keyBuf, &timeKey);
raxRemove(group->pel_by_time, (unsigned char*)&keyBuf, sizeof(keyBuf), NULL);
raxRemove(nack->consumer->pel, buf, sizeof(buf), NULL);
/* Since we're removing all references from the cgroups_ref, we can directly
* free the NACK without unlinking it from the cgroups_ref. */
streamFreeNACK(nack);
streamFreeNACK(nack);
}
raxRemove(s->cgroups_ref, buf, sizeof(streamID), NULL);
@ -2768,6 +3029,7 @@ streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id, lo
streamCG *cg = zmalloc(sizeof(*cg));
cg->pel = raxNew();
cg->pel_by_time = raxNew();
cg->consumers = raxNew();
cg->last_id.ms = 0;
cg->last_id.seq = 0;
@ -2780,6 +3042,7 @@ streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id, lo
/* Free a consumer group and all its associated data. */
static void streamFreeCG(streamCG *cg) {
raxFreeWithCallback(cg->pel, streamFreeNACKGeneric);
raxFree(cg->pel_by_time);
raxFreeWithCallback(cg->consumers, streamFreeConsumerGeneric);
zfree(cg);
}
@ -2854,8 +3117,19 @@ void streamDelConsumer(stream *s, streamCG *cg, streamConsumer *consumer) {
while(raxNext(&ri)) {
streamNACK *nack = ri.data;
streamUnlinkEntryFromCGroupRef(s, nack, ri.key);
streamFreeNACK(nack);
streamID id;
streamDecodeID(ri.key, &id);
uint64_t keyBuf[3];
pelTimeKey timeKey;
timeKey.delivery_time = nack->delivery_time;
timeKey.id = id;
encodePelTimeKey(keyBuf, &timeKey);
raxRemove(cg->pel_by_time, (unsigned char*)&keyBuf, sizeof(keyBuf), NULL);
raxRemove(cg->pel,ri.key,ri.key_len,NULL);
streamFreeNACK(nack);
}
raxStop(&ri);
@ -3167,6 +3441,12 @@ void xackCommand(client *c) {
void *result;
if (raxFind(group->pel,buf,sizeof(buf),&result)) {
streamNACK *nack = result;
uint64_t keyBuf[3];
pelTimeKey timeKey;
timeKey.delivery_time = nack->delivery_time;
timeKey.id = ids[j-3];
encodePelTimeKey(keyBuf, &timeKey);
raxRemove(group->pel_by_time, (unsigned char*)&keyBuf, sizeof(keyBuf), NULL);
raxRemove(group->pel,buf,sizeof(buf),NULL);
raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
streamDestroyNACK(kv->ptr, nack, buf);
@ -3236,6 +3516,12 @@ void xackdelCommand(client *c) {
void *result;
if (raxFind(group->pel,buf,sizeof(buf),&result)) {
streamNACK *nack = result;
uint64_t keyBuf[3];
pelTimeKey timeKey;
timeKey.delivery_time = nack->delivery_time;
timeKey.id = *id;
encodePelTimeKey(keyBuf, &timeKey);
raxRemove(group->pel_by_time, (unsigned char*)&keyBuf, sizeof(keyBuf), NULL);
raxRemove(group->pel,buf,sizeof(buf),NULL);
raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
streamDestroyNACK(s, nack, buf);
@ -3539,6 +3825,8 @@ void xpendingCommand(client *c) {
* successfully claimed, so that the caller is able to understand
* what messages it is now in charge of. */
void xclaimCommand(client *c) {
uint64_t keyBuf[3];
pelTimeKey timeKey;
streamCG *group = NULL;
kvobj *o = lookupKeyRead(c->db,c->argv[1]);
long long minidle; /* Minimum idle time argument. */
@ -3667,7 +3955,11 @@ void xclaimCommand(client *c) {
propagate_last_id = 0; /* Will be propagated by XCLAIM itself. */
server.dirty++;
/* Release the NACK */
raxRemove(group->pel,buf,sizeof(buf),NULL);
timeKey.delivery_time = nack->delivery_time;
timeKey.id = id;
encodePelTimeKey(keyBuf, &timeKey);
raxRemove(group->pel_by_time, (unsigned char*)&keyBuf, sizeof(keyBuf), NULL);
raxRemove(group->pel, buf,sizeof(buf),NULL);
raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
streamDestroyNACK(o->ptr, nack, buf);
}
@ -3683,6 +3975,10 @@ void xclaimCommand(client *c) {
/* Create the NACK. */
nack = streamCreateNACK(NULL);
raxInsert(group->pel,buf,sizeof(buf),nack,NULL);
timeKey.delivery_time = nack->delivery_time;
timeKey.id = id;
encodePelTimeKey(keyBuf, &timeKey);
raxInsert(group->pel_by_time, (unsigned char*)&keyBuf, sizeof(keyBuf), NULL, NULL);
nack->cgroup_ref_node = streamLinkCGroupToEntry(o->ptr, group, buf);
}
@ -3705,7 +4001,18 @@ void xclaimCommand(client *c) {
if (nack->consumer)
raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
}
timeKey.delivery_time = nack->delivery_time;
timeKey.id = id;
encodePelTimeKey(keyBuf, &timeKey);
raxRemove(group->pel_by_time, (unsigned char*)&keyBuf, sizeof(keyBuf), NULL);
nack->delivery_time = deliverytime;
timeKey.delivery_time = nack->delivery_time;
encodePelTimeKey(keyBuf, &timeKey);
raxInsert(group->pel_by_time, (unsigned char*)&keyBuf, sizeof(keyBuf), NULL, NULL);
/* Set the delivery attempts counter if given, otherwise
* autoincrement unless JUSTID option provided */
if (retrycount >= 0) {
@ -3722,7 +4029,7 @@ void xclaimCommand(client *c) {
if (justid) {
addReplyStreamID(c,&id);
} else {
serverAssert(streamReplyWithRange(c,o->ptr,&id,&id,1,0,NULL,NULL,STREAM_RWR_RAWENTRIES,NULL,NULL) == 1);
serverAssert(streamReplyWithRange(c,o->ptr,&id,&id,1,0,-1,NULL,NULL,STREAM_RWR_RAWENTRIES,NULL,NULL) == 1);
}
arraylen++;
@ -3767,6 +4074,8 @@ void xautoclaimCommand(client *c) {
long count = 100; /* Maximum entries to claim. */
const unsigned attempts_factor = 10;
streamID startid;
uint64_t keyBuf[3];
pelTimeKey timeKey;
int startex;
int justid = 0;
@ -3857,6 +4166,10 @@ void xautoclaimCommand(client *c) {
decrRefCount(idstr);
server.dirty++;
/* Clear this entry from the PEL, it no longer exists */
timeKey.delivery_time = nack->delivery_time;
timeKey.id = id;
encodePelTimeKey(keyBuf, &timeKey);
raxRemove(group->pel_by_time, (unsigned char*)&keyBuf, sizeof(keyBuf), NULL);
raxRemove(group->pel,ri.key,ri.key_len,NULL);
raxRemove(nack->consumer->pel,ri.key,ri.key_len,NULL);
streamDestroyNACK(o->ptr, nack, ri.key);
@ -3882,7 +4195,17 @@ void xautoclaimCommand(client *c) {
}
/* Update the consumer and idle time. */
timeKey.delivery_time = nack->delivery_time;
timeKey.id = id;
encodePelTimeKey(keyBuf, &timeKey);
raxRemove(group->pel_by_time, (unsigned char*)&keyBuf, sizeof(keyBuf), NULL);
nack->delivery_time = now;
timeKey.delivery_time = nack->delivery_time;
encodePelTimeKey(keyBuf, &timeKey);
raxInsert(group->pel_by_time, (unsigned char*)&keyBuf, sizeof(keyBuf), NULL, NULL);
/* Increment the delivery attempts counter unless JUSTID option provided */
if (!justid)
nack->delivery_count++;
@ -3897,7 +4220,7 @@ void xautoclaimCommand(client *c) {
if (justid) {
addReplyStreamID(c,&id);
} else {
serverAssert(streamReplyWithRange(c,o->ptr,&id,&id,1,0,NULL,NULL,STREAM_RWR_RAWENTRIES,NULL,NULL) == 1);
serverAssert(streamReplyWithRange(c,o->ptr,&id,&id,1,0,-1,NULL,NULL,STREAM_RWR_RAWENTRIES,NULL,NULL) == 1);
}
arraylen++;
count--;
@ -4223,11 +4546,11 @@ void xinfoReplyWithStreamInfo(client *c, stream *s) {
start.ms = start.seq = 0;
end.ms = end.seq = UINT64_MAX;
addReplyBulkCString(c,"first-entry");
emitted = streamReplyWithRange(c,s,&start,&end,1,0,NULL,NULL,
emitted = streamReplyWithRange(c,s,&start,&end,1,0,-1,NULL,NULL,
STREAM_RWR_RAWENTRIES,NULL,NULL);
if (!emitted) addReplyNull(c);
addReplyBulkCString(c,"last-entry");
emitted = streamReplyWithRange(c,s,&start,&end,1,1,NULL,NULL,
emitted = streamReplyWithRange(c,s,&start,&end,1,1,-1,NULL,NULL,
STREAM_RWR_RAWENTRIES,NULL,NULL);
if (!emitted) addReplyNull(c);
} else {
@ -4235,7 +4558,7 @@ void xinfoReplyWithStreamInfo(client *c, stream *s) {
/* Stream entries */
addReplyBulkCString(c,"entries");
streamReplyWithRange(c,s,NULL,NULL,count,0,NULL,NULL,0,NULL,NULL);
streamReplyWithRange(c,s,NULL,NULL,count,0,-1,NULL,NULL,0,NULL,NULL);
/* Consumer groups */
addReplyBulkCString(c,"groups");
@ -4564,3 +4887,102 @@ int streamValidateListpackIntegrity(unsigned char *lp, size_t size, int deep) {
return 1;
}
/* Convert the specified pelTimeKey as a 192 bit big endian number, so
* that the key can be sorted lexicographically. */
void encodePelTimeKey(void *buf, pelTimeKey *key) {
uint64_t e[3];
e[0] = htonu64(key->delivery_time);
e[1] = htonu64(key->id.ms);
e[2] = htonu64(key->id.seq);
memcpy(buf,e,sizeof(e));
}
/* This is the reverse of encodePelTimeKey(): the decoded key will be stored
* in the 'key' structure passed by reference. The buffer 'buf' must point
* to a 192 bit big-endian encoded key. */
void decodePelTimeKey(void *buf, pelTimeKey *key) {
uint64_t e[3];
memcpy(e,buf,sizeof(e));
key->delivery_time = ntohu64(e[0]);
key->id.ms = ntohu64(e[1]);
key->id.seq = ntohu64(e[2]);
}
/* Register stream keys for monitoring of expired pending entries to enable
* reactive blocking behavior for XREADGROUP commands with CLAIM. When a client
* blocks waiting for either new messages or expired pending entries, this
* function records the earliest timestamp when pending entries will expire
* (satisfy the min-idle-time requirement).
*
* For multi-client coordination, when multiple clients are blocked on the same
* stream with different min-idle-time values, the dictionary stores the minimum
* (earliest) expire_time across all clients to ensure the earliest possible
* wakeup when any pending entry expires and becomes available for claiming.
*
* 'c' is the client that is blocking on the stream(s).
* 'keys' is an array of stream key objects to monitor.
* 'numkeys' is the number of keys in the array.
* 'expire_time' is the absolute timestamp (in milliseconds) when the next
* pending entry will expire for this client, calculated as
* next_delivery_time + min_idle_time, where next_delivery_time is the
* delivery timestamp of the oldest pending entry in the stream.
*
* For new entries, the key is added with the given expire_time and the
* reference count is incremented. For existing entries, the expire_time
* is updated to the minimum value if the new expire_time is earlier,
* ensuring the earliest wakeup time is preserved for multi-client scenarios.
* Note that the reference count is only incremented for newly added keys,
* not for updates to existing entries. */
void trackStreamClaimTimeouts(client *c, robj **keys, int numkeys, uint64_t expire_time) {
dictEntry *db_watch_entry, *db_watch_existing_entry;
uint64_t old_expire_time;
int j;
for (j = 0; j < numkeys; j++) {
db_watch_entry = dictAddRaw(c->db->stream_claim_pending_keys, keys[j], &db_watch_existing_entry);
if (db_watch_entry != NULL) {
dictSetUnsignedIntegerVal(db_watch_entry, expire_time);
incrRefCount(keys[j]);
} else {
old_expire_time = dictGetUnsignedIntegerVal(db_watch_existing_entry);
if (expire_time < old_expire_time) {
dictSetUnsignedIntegerVal(db_watch_existing_entry, expire_time);
}
}
}
}
/* Check and wake clients waiting for expired pending entries. This function
* is invoked regularly from blockedBeforeSleep() to monitor all streams being
* watched for expired pending entries and wake up blocked clients when
* entries expire and become available for claiming.
*
* The function iterates through all databases and their stream_claim_pending_keys
* dictionaries. For each watched stream, it compares the registered expire_time
* against the current server time. When expire_time <= current_time, the pending
* entry has expired and the stream is signaled as ready via signalKeyAsReady(),
* which wakes all blocked clients waiting on that stream. The entry is then
* removed from stream_claim_pending_keys. */
void handleClaimableStreamEntries(void) {
for (int j = 0; j < server.dbnum; j++) {
dictEntry *de;
dictIterator di;
dictInitSafeIterator(&di, server.db[j].stream_claim_pending_keys);
while ((de = dictNext(&di)) != NULL) {
robj *key = dictGetKey(de);
uint64_t expire_time = dictGetUnsignedIntegerVal(de);
kvobj *kv = dbFind(&server.db[j], key->ptr);
if (!kv || kv->type != OBJ_STREAM)
continue;
if (expire_time < (uint64_t)server.mstime) {
signalKeyAsReady(&server.db[j], key, kv->type);
dictDelete(server.db[j].stream_claim_pending_keys, key);
}
}
dictResetIterator(&di);
}
}

File diff suppressed because it is too large Load Diff