mirror of https://github.com/redis/redis.git
Fix defrag issues for stream defrag and HFE (#14323)
This PR fixes three defrag issues. 1. Fix the issue that forget to update cgroup_ref_node when the consume group was reallocated. This crash was introduced by https://github.com/redis/redis/issues/14130 In this PR, when performing defragmentation on `s->cgroups` using `defragRadixTree()`, we no longer rely on the automatic data defragmentation of `defragRadixTree()`. Instead, we manually defragment the consumer group and then update its reference in `s->cgroups`. 2. Fix a use-after-free issue caused by updating dictionary keys after HFE key is reallocated. This issue was introduced by https://github.com/redis/redis/issues/13842 3. Fix the issue that forgot to be updated NextSegHdr->firstSeg when the first segment was reallocated. This issue was introduced by https://github.com/redis/redis/issues/13842 --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
parent
45e9f27306
commit
9829bdbfd9
48
src/defrag.c
48
src/defrag.c
|
@ -141,12 +141,11 @@ typedef struct {
|
||||||
* pointers are worthwhile moving and which aren't */
|
* pointers are worthwhile moving and which aren't */
|
||||||
int je_get_defrag_hint(void* ptr);
|
int je_get_defrag_hint(void* ptr);
|
||||||
|
|
||||||
/* Defrag helper for generic allocations.
|
/* Defrag helper for generic allocations without freeing old pointer.
|
||||||
*
|
*
|
||||||
* returns NULL in case the allocation wasn't moved.
|
* Note: The caller is responsible for freeing the old pointer if this function
|
||||||
* when it returns a non-null value, the old pointer was already released
|
* returns a non-NULL value. */
|
||||||
* and should NOT be accessed. */
|
void* activeDefragAllocWithoutFree(void *ptr) {
|
||||||
void* activeDefragAlloc(void *ptr) {
|
|
||||||
size_t size;
|
size_t size;
|
||||||
void *newptr;
|
void *newptr;
|
||||||
if(!je_get_defrag_hint(ptr)) {
|
if(!je_get_defrag_hint(ptr)) {
|
||||||
|
@ -159,11 +158,26 @@ void* activeDefragAlloc(void *ptr) {
|
||||||
size = zmalloc_usable_size(ptr);
|
size = zmalloc_usable_size(ptr);
|
||||||
newptr = zmalloc_no_tcache(size);
|
newptr = zmalloc_no_tcache(size);
|
||||||
memcpy(newptr, ptr, size);
|
memcpy(newptr, ptr, size);
|
||||||
zfree_no_tcache(ptr);
|
|
||||||
server.stat_active_defrag_hits++;
|
server.stat_active_defrag_hits++;
|
||||||
return newptr;
|
return newptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void activeDefragFree(void *ptr) {
|
||||||
|
zfree_no_tcache(ptr);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Defrag helper for generic allocations.
|
||||||
|
*
|
||||||
|
* returns NULL in case the allocation wasn't moved.
|
||||||
|
* when it returns a non-null value, the old pointer was already released
|
||||||
|
* and should NOT be accessed. */
|
||||||
|
void* activeDefragAlloc(void *ptr) {
|
||||||
|
void *newptr = activeDefragAllocWithoutFree(ptr);
|
||||||
|
if (newptr)
|
||||||
|
activeDefragFree(ptr);
|
||||||
|
return newptr;
|
||||||
|
}
|
||||||
|
|
||||||
/* Raw memory allocation for defrag, avoid using tcache. */
|
/* Raw memory allocation for defrag, avoid using tcache. */
|
||||||
void *activeDefragAllocRaw(size_t size) {
|
void *activeDefragAllocRaw(size_t size) {
|
||||||
return zmalloc_no_tcache(size);
|
return zmalloc_no_tcache(size);
|
||||||
|
@ -171,7 +185,7 @@ void *activeDefragAllocRaw(size_t size) {
|
||||||
|
|
||||||
/* Raw memory free for defrag, avoid using tcache. */
|
/* Raw memory free for defrag, avoid using tcache. */
|
||||||
void activeDefragFreeRaw(void *ptr) {
|
void activeDefragFreeRaw(void *ptr) {
|
||||||
zfree_no_tcache(ptr);
|
activeDefragFree(ptr);
|
||||||
server.stat_active_defrag_hits++;
|
server.stat_active_defrag_hits++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -825,6 +839,7 @@ void* defragStreamConsumerPendingEntry(raxIterator *ri, void *privdata) {
|
||||||
PendingEntryContext *ctx = privdata;
|
PendingEntryContext *ctx = privdata;
|
||||||
streamNACK *nack = ri->data, *newnack;
|
streamNACK *nack = ri->data, *newnack;
|
||||||
nack->consumer = ctx->c; /* update nack pointer to consumer */
|
nack->consumer = ctx->c; /* update nack pointer to consumer */
|
||||||
|
nack->cgroup_ref_node->value = ctx->cg; /* Update the value of cgroups_ref node to the consumer group. */
|
||||||
newnack = activeDefragAlloc(nack);
|
newnack = activeDefragAlloc(nack);
|
||||||
if (newnack) {
|
if (newnack) {
|
||||||
/* update consumer group pointer to the nack */
|
/* update consumer group pointer to the nack */
|
||||||
|
@ -853,13 +868,15 @@ void* defragStreamConsumer(raxIterator *ri, void *privdata) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void* defragStreamConsumerGroup(raxIterator *ri, void *privdata) {
|
void* defragStreamConsumerGroup(raxIterator *ri, void *privdata) {
|
||||||
streamCG *cg = ri->data;
|
streamCG *newcg, *cg = ri->data;
|
||||||
UNUSED(privdata);
|
UNUSED(privdata);
|
||||||
|
if ((newcg = activeDefragAlloc(cg)))
|
||||||
|
cg = newcg;
|
||||||
if (cg->consumers)
|
if (cg->consumers)
|
||||||
defragRadixTree(&cg->consumers, 0, defragStreamConsumer, cg);
|
defragRadixTree(&cg->consumers, 0, defragStreamConsumer, cg);
|
||||||
if (cg->pel)
|
if (cg->pel)
|
||||||
defragRadixTree(&cg->pel, 0, NULL, NULL);
|
defragRadixTree(&cg->pel, 0, NULL, NULL);
|
||||||
return NULL;
|
return cg;
|
||||||
}
|
}
|
||||||
|
|
||||||
void defragStream(defragKeysCtx *ctx, kvobj *ob) {
|
void defragStream(defragKeysCtx *ctx, kvobj *ob) {
|
||||||
|
@ -879,7 +896,7 @@ void defragStream(defragKeysCtx *ctx, kvobj *ob) {
|
||||||
defragRadixTree(&s->rax, 1, NULL, NULL);
|
defragRadixTree(&s->rax, 1, NULL, NULL);
|
||||||
|
|
||||||
if (s->cgroups)
|
if (s->cgroups)
|
||||||
defragRadixTree(&s->cgroups, 1, defragStreamConsumerGroup, NULL);
|
defragRadixTree(&s->cgroups, 0, defragStreamConsumerGroup, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Defrag a module key. This is either done immediately or scheduled
|
/* Defrag a module key. This is either done immediately or scheduled
|
||||||
|
@ -1275,7 +1292,7 @@ static doneStatus defragStageExpiresKvstore(void *ctx, monotime endtime) {
|
||||||
void *activeDefragHExpiresOB(void *ptr, void *privdata) {
|
void *activeDefragHExpiresOB(void *ptr, void *privdata) {
|
||||||
redisDb *db = privdata;
|
redisDb *db = privdata;
|
||||||
dictEntryLink link, exlink = NULL;
|
dictEntryLink link, exlink = NULL;
|
||||||
kvobj *kvobj = ptr;
|
kvobj *newkv, *kvobj = ptr;
|
||||||
sds keystr = kvobjGetKey(kvobj);
|
sds keystr = kvobjGetKey(kvobj);
|
||||||
unsigned int slot = calculateKeySlot(keystr);
|
unsigned int slot = calculateKeySlot(keystr);
|
||||||
serverAssert(kvobj->type == OBJ_HASH);
|
serverAssert(kvobj->type == OBJ_HASH);
|
||||||
|
@ -1289,15 +1306,16 @@ void *activeDefragHExpiresOB(void *ptr, void *privdata) {
|
||||||
serverAssert(exlink != NULL);
|
serverAssert(exlink != NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((kvobj = activeDefragAlloc(kvobj))) {
|
if ((newkv = activeDefragAllocWithoutFree(kvobj))) {
|
||||||
/* Update its reference in the DB keys. */
|
/* Update its reference in the DB keys. */
|
||||||
link = kvstoreDictFindLink(db->keys, slot, keystr, NULL);
|
link = kvstoreDictFindLink(db->keys, slot, keystr, NULL);
|
||||||
serverAssert(link != NULL);
|
serverAssert(link != NULL);
|
||||||
kvstoreDictSetAtLink(db->keys, slot, kvobj, &link, 0);
|
kvstoreDictSetAtLink(db->keys, slot, newkv, &link, 0);
|
||||||
if (expire != -1)
|
if (expire != -1)
|
||||||
kvstoreDictSetAtLink(db->expires, slot, kvobj, &exlink, 0);
|
kvstoreDictSetAtLink(db->expires, slot, newkv, &exlink, 0);
|
||||||
|
activeDefragFree(kvobj);
|
||||||
}
|
}
|
||||||
return kvobj;
|
return newkv;
|
||||||
}
|
}
|
||||||
|
|
||||||
static doneStatus defragStageHExpires(void *ctx, monotime endtime) {
|
static doneStatus defragStageHExpires(void *ctx, monotime endtime) {
|
||||||
|
|
|
@ -1844,6 +1844,7 @@ void ebDefragRaxBucket(EbucketsType *type, raxIterator *ri,
|
||||||
ebDefragFunctions *defragfns, void *privdata)
|
ebDefragFunctions *defragfns, void *privdata)
|
||||||
{
|
{
|
||||||
CommonSegHdr *currentSegHdr = ri->data;
|
CommonSegHdr *currentSegHdr = ri->data;
|
||||||
|
CommonSegHdr *firstSegHdr = currentSegHdr;
|
||||||
eItem iter = ((FirstSegHdr*)currentSegHdr)->head;
|
eItem iter = ((FirstSegHdr*)currentSegHdr)->head;
|
||||||
ExpireMeta *mHead = type->getExpireMeta(iter);
|
ExpireMeta *mHead = type->getExpireMeta(iter);
|
||||||
ExpireMeta *prevSegLastItem = NULL; /* The last item of the previous segment */
|
ExpireMeta *prevSegLastItem = NULL; /* The last item of the previous segment */
|
||||||
|
@ -1879,6 +1880,7 @@ void ebDefragRaxBucket(EbucketsType *type, raxIterator *ri,
|
||||||
if (currentSegHdr == ri->data) {
|
if (currentSegHdr == ri->data) {
|
||||||
/* If it's the first segment, update the rax data pointer. */
|
/* If it's the first segment, update the rax data pointer. */
|
||||||
raxSetData(ri->node, ri->data=newSegHdr);
|
raxSetData(ri->node, ri->data=newSegHdr);
|
||||||
|
firstSegHdr = newSegHdr;
|
||||||
} else {
|
} else {
|
||||||
/* For non-first segments, update the previous segment's next
|
/* For non-first segments, update the previous segment's next
|
||||||
* item to new pointer. */
|
* item to new pointer. */
|
||||||
|
@ -1897,6 +1899,7 @@ void ebDefragRaxBucket(EbucketsType *type, raxIterator *ri,
|
||||||
}
|
}
|
||||||
|
|
||||||
NextSegHdr *nextSegHdr = mIter->next;
|
NextSegHdr *nextSegHdr = mIter->next;
|
||||||
|
nextSegHdr->firstSeg = (FirstSegHdr *)firstSegHdr;
|
||||||
if (newSegHdr) {
|
if (newSegHdr) {
|
||||||
/* Update next segment's prev to point to the defragmented segment. */
|
/* Update next segment's prev to point to the defragmented segment. */
|
||||||
nextSegHdr->prevSeg = newSegHdr;
|
nextSegHdr->prevSeg = newSegHdr;
|
||||||
|
|
Loading…
Reference in New Issue