Fix incorrect lag due to trimming stream via XTRIM or XADD command (#13958)
CI / test-ubuntu-latest (push) Has been cancelled Details
CI / test-sanitizer-address (push) Has been cancelled Details
CI / build-debian-old (push) Has been cancelled Details
CI / build-macos-latest (push) Has been cancelled Details
CI / build-32bit (push) Has been cancelled Details
CI / build-libc-malloc (push) Has been cancelled Details
CI / build-centos-jemalloc (push) Has been cancelled Details
CI / build-old-chain-jemalloc (push) Has been cancelled Details
Codecov / code-coverage (push) Has been cancelled Details
External Server Tests / test-external-standalone (push) Has been cancelled Details
External Server Tests / test-external-cluster (push) Has been cancelled Details
External Server Tests / test-external-nodebug (push) Has been cancelled Details
Spellcheck / Spellcheck (push) Has been cancelled Details

This PR fix the lag calculation by ensuring that when consumer group's last_id
is behind the first entry, the consumer group's entries read is considered
invalid and recalculated from the start of the stream

Supplement to PR #13473 

Close #13957

Signed-off-by: Ernesto Alejandro Santana Hidalgo <ernesto.alejandrosantana@gmail.com>
This commit is contained in:
nesty92 2025-04-22 04:11:10 +02:00 committed by GitHub
parent a51918209c
commit 8468ded667
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 17 additions and 1 deletions

View File

@ -1695,7 +1695,10 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
while(streamIteratorGetID(&si,&id,&numfields)) {
/* Update the group last_id if needed. */
if (group && streamCompareID(&id,&group->last_id) > 0) {
if (group->entries_read != SCG_INVALID_ENTRIES_READ && !streamRangeHasTombstones(s,&group->last_id,NULL)) {
if (group->entries_read != SCG_INVALID_ENTRIES_READ &&
streamCompareID(&group->last_id, &s->first_id) >= 0 &&
!streamRangeHasTombstones(s,&group->last_id,NULL))
{
/* A valid counter and no tombstones between the group's last-delivered-id
* and the stream's last-generated-id mean we can increment the read counter
* to keep tracking the group's progress. */

View File

@ -1300,6 +1300,19 @@ start_server {
assert_equal [dict get $group entries-read] 1
assert_equal [dict get $group lag] 1
# When all the entries are read, the lag is always 0.
r XREADGROUP GROUP mygroup alice STREAMS x >
set reply [r XINFO STREAM x FULL]
set group [lindex [dict get $reply groups] 0]
assert_equal [dict get $group entries-read] 5
assert_equal [dict get $group lag] 0
r XADD x 6-0 data f
set reply [r XINFO STREAM x FULL]
set group [lindex [dict get $reply groups] 0]
assert_equal [dict get $group entries-read] 5
assert_equal [dict get $group lag] 1
# When all the entries were deleted, the lag is always 0.
r XTRIM x MAXLEN 0
set reply [r XINFO STREAM x FULL]