mirror of https://github.com/redis/redis.git
Merge branch 'unstable' of https://github.com/antirez/redis into unstable
This commit is contained in:
commit
9f4b121512
|
@ -1248,7 +1248,7 @@ void configSetCommand(client *c) {
|
||||||
if (server.maxmemory < zmalloc_used_memory()) {
|
if (server.maxmemory < zmalloc_used_memory()) {
|
||||||
serverLog(LL_WARNING,"WARNING: the new maxmemory value set via CONFIG SET is smaller than the current memory usage. This will result in key eviction and/or the inability to accept new write commands depending on the maxmemory-policy.");
|
serverLog(LL_WARNING,"WARNING: the new maxmemory value set via CONFIG SET is smaller than the current memory usage. This will result in key eviction and/or the inability to accept new write commands depending on the maxmemory-policy.");
|
||||||
}
|
}
|
||||||
freeMemoryIfNeeded();
|
freeMemoryIfNeededAndSafe();
|
||||||
}
|
}
|
||||||
} config_set_memory_field(
|
} config_set_memory_field(
|
||||||
"proto-max-bulk-len",server.proto_max_bulk_len) {
|
"proto-max-bulk-len",server.proto_max_bulk_len) {
|
||||||
|
|
2
src/db.c
2
src/db.c
|
@ -212,7 +212,7 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) {
|
||||||
* 2) clients WATCHing for the destination key notified.
|
* 2) clients WATCHing for the destination key notified.
|
||||||
* 3) The expire time of the key is reset (the key is made persistent).
|
* 3) The expire time of the key is reset (the key is made persistent).
|
||||||
*
|
*
|
||||||
* All the new keys in the database should be creted via this interface. */
|
* All the new keys in the database should be created via this interface. */
|
||||||
void setKey(redisDb *db, robj *key, robj *val) {
|
void setKey(redisDb *db, robj *key, robj *val) {
|
||||||
if (lookupKeyWrite(db,key) == NULL) {
|
if (lookupKeyWrite(db,key) == NULL) {
|
||||||
dbAdd(db,key,val);
|
dbAdd(db,key,val);
|
||||||
|
|
299
src/debug.c
299
src/debug.c
|
@ -74,7 +74,7 @@ void xorDigest(unsigned char *digest, void *ptr, size_t len) {
|
||||||
digest[j] ^= hash[j];
|
digest[j] ^= hash[j];
|
||||||
}
|
}
|
||||||
|
|
||||||
void xorObjectDigest(unsigned char *digest, robj *o) {
|
void xorStringObjectDigest(unsigned char *digest, robj *o) {
|
||||||
o = getDecodedObject(o);
|
o = getDecodedObject(o);
|
||||||
xorDigest(digest,o->ptr,sdslen(o->ptr));
|
xorDigest(digest,o->ptr,sdslen(o->ptr));
|
||||||
decrRefCount(o);
|
decrRefCount(o);
|
||||||
|
@ -104,12 +104,151 @@ void mixDigest(unsigned char *digest, void *ptr, size_t len) {
|
||||||
SHA1Final(digest,&ctx);
|
SHA1Final(digest,&ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
void mixObjectDigest(unsigned char *digest, robj *o) {
|
void mixStringObjectDigest(unsigned char *digest, robj *o) {
|
||||||
o = getDecodedObject(o);
|
o = getDecodedObject(o);
|
||||||
mixDigest(digest,o->ptr,sdslen(o->ptr));
|
mixDigest(digest,o->ptr,sdslen(o->ptr));
|
||||||
decrRefCount(o);
|
decrRefCount(o);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* This function computes the digest of a data structure stored in the
|
||||||
|
* object 'o'. It is the core of the DEBUG DIGEST command: when taking the
|
||||||
|
* digest of a whole dataset, we take the digest of the key and the value
|
||||||
|
* pair, and xor all those together.
|
||||||
|
*
|
||||||
|
* Note that this function does not reset the initial 'digest' passed, it
|
||||||
|
* will continue mixing this object digest to anything that was already
|
||||||
|
* present. */
|
||||||
|
void xorObjectDigest(redisDb *db, robj *keyobj, unsigned char *digest, robj *o) {
|
||||||
|
uint32_t aux = htonl(o->type);
|
||||||
|
mixDigest(digest,&aux,sizeof(aux));
|
||||||
|
long long expiretime = getExpire(db,keyobj);
|
||||||
|
char buf[128];
|
||||||
|
|
||||||
|
/* Save the key and associated value */
|
||||||
|
if (o->type == OBJ_STRING) {
|
||||||
|
mixStringObjectDigest(digest,o);
|
||||||
|
} else if (o->type == OBJ_LIST) {
|
||||||
|
listTypeIterator *li = listTypeInitIterator(o,0,LIST_TAIL);
|
||||||
|
listTypeEntry entry;
|
||||||
|
while(listTypeNext(li,&entry)) {
|
||||||
|
robj *eleobj = listTypeGet(&entry);
|
||||||
|
mixStringObjectDigest(digest,eleobj);
|
||||||
|
decrRefCount(eleobj);
|
||||||
|
}
|
||||||
|
listTypeReleaseIterator(li);
|
||||||
|
} else if (o->type == OBJ_SET) {
|
||||||
|
setTypeIterator *si = setTypeInitIterator(o);
|
||||||
|
sds sdsele;
|
||||||
|
while((sdsele = setTypeNextObject(si)) != NULL) {
|
||||||
|
xorDigest(digest,sdsele,sdslen(sdsele));
|
||||||
|
sdsfree(sdsele);
|
||||||
|
}
|
||||||
|
setTypeReleaseIterator(si);
|
||||||
|
} else if (o->type == OBJ_ZSET) {
|
||||||
|
unsigned char eledigest[20];
|
||||||
|
|
||||||
|
if (o->encoding == OBJ_ENCODING_ZIPLIST) {
|
||||||
|
unsigned char *zl = o->ptr;
|
||||||
|
unsigned char *eptr, *sptr;
|
||||||
|
unsigned char *vstr;
|
||||||
|
unsigned int vlen;
|
||||||
|
long long vll;
|
||||||
|
double score;
|
||||||
|
|
||||||
|
eptr = ziplistIndex(zl,0);
|
||||||
|
serverAssert(eptr != NULL);
|
||||||
|
sptr = ziplistNext(zl,eptr);
|
||||||
|
serverAssert(sptr != NULL);
|
||||||
|
|
||||||
|
while (eptr != NULL) {
|
||||||
|
serverAssert(ziplistGet(eptr,&vstr,&vlen,&vll));
|
||||||
|
score = zzlGetScore(sptr);
|
||||||
|
|
||||||
|
memset(eledigest,0,20);
|
||||||
|
if (vstr != NULL) {
|
||||||
|
mixDigest(eledigest,vstr,vlen);
|
||||||
|
} else {
|
||||||
|
ll2string(buf,sizeof(buf),vll);
|
||||||
|
mixDigest(eledigest,buf,strlen(buf));
|
||||||
|
}
|
||||||
|
|
||||||
|
snprintf(buf,sizeof(buf),"%.17g",score);
|
||||||
|
mixDigest(eledigest,buf,strlen(buf));
|
||||||
|
xorDigest(digest,eledigest,20);
|
||||||
|
zzlNext(zl,&eptr,&sptr);
|
||||||
|
}
|
||||||
|
} else if (o->encoding == OBJ_ENCODING_SKIPLIST) {
|
||||||
|
zset *zs = o->ptr;
|
||||||
|
dictIterator *di = dictGetIterator(zs->dict);
|
||||||
|
dictEntry *de;
|
||||||
|
|
||||||
|
while((de = dictNext(di)) != NULL) {
|
||||||
|
sds sdsele = dictGetKey(de);
|
||||||
|
double *score = dictGetVal(de);
|
||||||
|
|
||||||
|
snprintf(buf,sizeof(buf),"%.17g",*score);
|
||||||
|
memset(eledigest,0,20);
|
||||||
|
mixDigest(eledigest,sdsele,sdslen(sdsele));
|
||||||
|
mixDigest(eledigest,buf,strlen(buf));
|
||||||
|
xorDigest(digest,eledigest,20);
|
||||||
|
}
|
||||||
|
dictReleaseIterator(di);
|
||||||
|
} else {
|
||||||
|
serverPanic("Unknown sorted set encoding");
|
||||||
|
}
|
||||||
|
} else if (o->type == OBJ_HASH) {
|
||||||
|
hashTypeIterator *hi = hashTypeInitIterator(o);
|
||||||
|
while (hashTypeNext(hi) != C_ERR) {
|
||||||
|
unsigned char eledigest[20];
|
||||||
|
sds sdsele;
|
||||||
|
|
||||||
|
memset(eledigest,0,20);
|
||||||
|
sdsele = hashTypeCurrentObjectNewSds(hi,OBJ_HASH_KEY);
|
||||||
|
mixDigest(eledigest,sdsele,sdslen(sdsele));
|
||||||
|
sdsfree(sdsele);
|
||||||
|
sdsele = hashTypeCurrentObjectNewSds(hi,OBJ_HASH_VALUE);
|
||||||
|
mixDigest(eledigest,sdsele,sdslen(sdsele));
|
||||||
|
sdsfree(sdsele);
|
||||||
|
xorDigest(digest,eledigest,20);
|
||||||
|
}
|
||||||
|
hashTypeReleaseIterator(hi);
|
||||||
|
} else if (o->type == OBJ_STREAM) {
|
||||||
|
streamIterator si;
|
||||||
|
streamIteratorStart(&si,o->ptr,NULL,NULL,0);
|
||||||
|
streamID id;
|
||||||
|
int64_t numfields;
|
||||||
|
|
||||||
|
while(streamIteratorGetID(&si,&id,&numfields)) {
|
||||||
|
sds itemid = sdscatfmt(sdsempty(),"%U.%U",id.ms,id.seq);
|
||||||
|
mixDigest(digest,itemid,sdslen(itemid));
|
||||||
|
sdsfree(itemid);
|
||||||
|
|
||||||
|
while(numfields--) {
|
||||||
|
unsigned char *field, *value;
|
||||||
|
int64_t field_len, value_len;
|
||||||
|
streamIteratorGetField(&si,&field,&value,
|
||||||
|
&field_len,&value_len);
|
||||||
|
mixDigest(digest,field,field_len);
|
||||||
|
mixDigest(digest,value,value_len);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
streamIteratorStop(&si);
|
||||||
|
} else if (o->type == OBJ_MODULE) {
|
||||||
|
RedisModuleDigest md;
|
||||||
|
moduleValue *mv = o->ptr;
|
||||||
|
moduleType *mt = mv->type;
|
||||||
|
moduleInitDigestContext(md);
|
||||||
|
if (mt->digest) {
|
||||||
|
mt->digest(&md,mv->value);
|
||||||
|
xorDigest(digest,md.x,sizeof(md.x));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
serverPanic("Unknown object type");
|
||||||
|
}
|
||||||
|
/* If the key has an expire, add it to the mix */
|
||||||
|
if (expiretime != -1) xorDigest(digest,"!!expire!!",10);
|
||||||
|
}
|
||||||
|
|
||||||
/* Compute the dataset digest. Since keys, sets elements, hashes elements
|
/* Compute the dataset digest. Since keys, sets elements, hashes elements
|
||||||
* are not ordered, we use a trick: every aggregate digest is the xor
|
* are not ordered, we use a trick: every aggregate digest is the xor
|
||||||
* of the digests of their elements. This way the order will not change
|
* of the digests of their elements. This way the order will not change
|
||||||
|
@ -118,7 +257,6 @@ void mixObjectDigest(unsigned char *digest, robj *o) {
|
||||||
* a different digest. */
|
* a different digest. */
|
||||||
void computeDatasetDigest(unsigned char *final) {
|
void computeDatasetDigest(unsigned char *final) {
|
||||||
unsigned char digest[20];
|
unsigned char digest[20];
|
||||||
char buf[128];
|
|
||||||
dictIterator *di = NULL;
|
dictIterator *di = NULL;
|
||||||
dictEntry *de;
|
dictEntry *de;
|
||||||
int j;
|
int j;
|
||||||
|
@ -141,7 +279,6 @@ void computeDatasetDigest(unsigned char *final) {
|
||||||
while((de = dictNext(di)) != NULL) {
|
while((de = dictNext(di)) != NULL) {
|
||||||
sds key;
|
sds key;
|
||||||
robj *keyobj, *o;
|
robj *keyobj, *o;
|
||||||
long long expiretime;
|
|
||||||
|
|
||||||
memset(digest,0,20); /* This key-val digest */
|
memset(digest,0,20); /* This key-val digest */
|
||||||
key = dictGetKey(de);
|
key = dictGetKey(de);
|
||||||
|
@ -150,134 +287,8 @@ void computeDatasetDigest(unsigned char *final) {
|
||||||
mixDigest(digest,key,sdslen(key));
|
mixDigest(digest,key,sdslen(key));
|
||||||
|
|
||||||
o = dictGetVal(de);
|
o = dictGetVal(de);
|
||||||
|
xorObjectDigest(db,keyobj,digest,o);
|
||||||
|
|
||||||
aux = htonl(o->type);
|
|
||||||
mixDigest(digest,&aux,sizeof(aux));
|
|
||||||
expiretime = getExpire(db,keyobj);
|
|
||||||
|
|
||||||
/* Save the key and associated value */
|
|
||||||
if (o->type == OBJ_STRING) {
|
|
||||||
mixObjectDigest(digest,o);
|
|
||||||
} else if (o->type == OBJ_LIST) {
|
|
||||||
listTypeIterator *li = listTypeInitIterator(o,0,LIST_TAIL);
|
|
||||||
listTypeEntry entry;
|
|
||||||
while(listTypeNext(li,&entry)) {
|
|
||||||
robj *eleobj = listTypeGet(&entry);
|
|
||||||
mixObjectDigest(digest,eleobj);
|
|
||||||
decrRefCount(eleobj);
|
|
||||||
}
|
|
||||||
listTypeReleaseIterator(li);
|
|
||||||
} else if (o->type == OBJ_SET) {
|
|
||||||
setTypeIterator *si = setTypeInitIterator(o);
|
|
||||||
sds sdsele;
|
|
||||||
while((sdsele = setTypeNextObject(si)) != NULL) {
|
|
||||||
xorDigest(digest,sdsele,sdslen(sdsele));
|
|
||||||
sdsfree(sdsele);
|
|
||||||
}
|
|
||||||
setTypeReleaseIterator(si);
|
|
||||||
} else if (o->type == OBJ_ZSET) {
|
|
||||||
unsigned char eledigest[20];
|
|
||||||
|
|
||||||
if (o->encoding == OBJ_ENCODING_ZIPLIST) {
|
|
||||||
unsigned char *zl = o->ptr;
|
|
||||||
unsigned char *eptr, *sptr;
|
|
||||||
unsigned char *vstr;
|
|
||||||
unsigned int vlen;
|
|
||||||
long long vll;
|
|
||||||
double score;
|
|
||||||
|
|
||||||
eptr = ziplistIndex(zl,0);
|
|
||||||
serverAssert(eptr != NULL);
|
|
||||||
sptr = ziplistNext(zl,eptr);
|
|
||||||
serverAssert(sptr != NULL);
|
|
||||||
|
|
||||||
while (eptr != NULL) {
|
|
||||||
serverAssert(ziplistGet(eptr,&vstr,&vlen,&vll));
|
|
||||||
score = zzlGetScore(sptr);
|
|
||||||
|
|
||||||
memset(eledigest,0,20);
|
|
||||||
if (vstr != NULL) {
|
|
||||||
mixDigest(eledigest,vstr,vlen);
|
|
||||||
} else {
|
|
||||||
ll2string(buf,sizeof(buf),vll);
|
|
||||||
mixDigest(eledigest,buf,strlen(buf));
|
|
||||||
}
|
|
||||||
|
|
||||||
snprintf(buf,sizeof(buf),"%.17g",score);
|
|
||||||
mixDigest(eledigest,buf,strlen(buf));
|
|
||||||
xorDigest(digest,eledigest,20);
|
|
||||||
zzlNext(zl,&eptr,&sptr);
|
|
||||||
}
|
|
||||||
} else if (o->encoding == OBJ_ENCODING_SKIPLIST) {
|
|
||||||
zset *zs = o->ptr;
|
|
||||||
dictIterator *di = dictGetIterator(zs->dict);
|
|
||||||
dictEntry *de;
|
|
||||||
|
|
||||||
while((de = dictNext(di)) != NULL) {
|
|
||||||
sds sdsele = dictGetKey(de);
|
|
||||||
double *score = dictGetVal(de);
|
|
||||||
|
|
||||||
snprintf(buf,sizeof(buf),"%.17g",*score);
|
|
||||||
memset(eledigest,0,20);
|
|
||||||
mixDigest(eledigest,sdsele,sdslen(sdsele));
|
|
||||||
mixDigest(eledigest,buf,strlen(buf));
|
|
||||||
xorDigest(digest,eledigest,20);
|
|
||||||
}
|
|
||||||
dictReleaseIterator(di);
|
|
||||||
} else {
|
|
||||||
serverPanic("Unknown sorted set encoding");
|
|
||||||
}
|
|
||||||
} else if (o->type == OBJ_HASH) {
|
|
||||||
hashTypeIterator *hi = hashTypeInitIterator(o);
|
|
||||||
while (hashTypeNext(hi) != C_ERR) {
|
|
||||||
unsigned char eledigest[20];
|
|
||||||
sds sdsele;
|
|
||||||
|
|
||||||
memset(eledigest,0,20);
|
|
||||||
sdsele = hashTypeCurrentObjectNewSds(hi,OBJ_HASH_KEY);
|
|
||||||
mixDigest(eledigest,sdsele,sdslen(sdsele));
|
|
||||||
sdsfree(sdsele);
|
|
||||||
sdsele = hashTypeCurrentObjectNewSds(hi,OBJ_HASH_VALUE);
|
|
||||||
mixDigest(eledigest,sdsele,sdslen(sdsele));
|
|
||||||
sdsfree(sdsele);
|
|
||||||
xorDigest(digest,eledigest,20);
|
|
||||||
}
|
|
||||||
hashTypeReleaseIterator(hi);
|
|
||||||
} else if (o->type == OBJ_STREAM) {
|
|
||||||
streamIterator si;
|
|
||||||
streamIteratorStart(&si,o->ptr,NULL,NULL,0);
|
|
||||||
streamID id;
|
|
||||||
int64_t numfields;
|
|
||||||
|
|
||||||
while(streamIteratorGetID(&si,&id,&numfields)) {
|
|
||||||
sds itemid = sdscatfmt(sdsempty(),"%U.%U",id.ms,id.seq);
|
|
||||||
mixDigest(digest,itemid,sdslen(itemid));
|
|
||||||
sdsfree(itemid);
|
|
||||||
|
|
||||||
while(numfields--) {
|
|
||||||
unsigned char *field, *value;
|
|
||||||
int64_t field_len, value_len;
|
|
||||||
streamIteratorGetField(&si,&field,&value,
|
|
||||||
&field_len,&value_len);
|
|
||||||
mixDigest(digest,field,field_len);
|
|
||||||
mixDigest(digest,value,value_len);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
streamIteratorStop(&si);
|
|
||||||
} else if (o->type == OBJ_MODULE) {
|
|
||||||
RedisModuleDigest md;
|
|
||||||
moduleValue *mv = o->ptr;
|
|
||||||
moduleType *mt = mv->type;
|
|
||||||
moduleInitDigestContext(md);
|
|
||||||
if (mt->digest) {
|
|
||||||
mt->digest(&md,mv->value);
|
|
||||||
xorDigest(digest,md.x,sizeof(md.x));
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
serverPanic("Unknown object type");
|
|
||||||
}
|
|
||||||
/* If the key has an expire, add it to the mix */
|
|
||||||
if (expiretime != -1) xorDigest(digest,"!!expire!!",10);
|
|
||||||
/* We can finally xor the key-val digest to the final digest */
|
/* We can finally xor the key-val digest to the final digest */
|
||||||
xorDigest(final,digest,20);
|
xorDigest(final,digest,20);
|
||||||
decrRefCount(keyobj);
|
decrRefCount(keyobj);
|
||||||
|
@ -293,6 +304,7 @@ void debugCommand(client *c) {
|
||||||
"CHANGE-REPL-ID -- Change the replication IDs of the instance. Dangerous, should be used only for testing the replication subsystem.",
|
"CHANGE-REPL-ID -- Change the replication IDs of the instance. Dangerous, should be used only for testing the replication subsystem.",
|
||||||
"CRASH-AND-RECOVER <milliseconds> -- Hard crash and restart after <milliseconds> delay.",
|
"CRASH-AND-RECOVER <milliseconds> -- Hard crash and restart after <milliseconds> delay.",
|
||||||
"DIGEST -- Output a hex signature representing the current DB content.",
|
"DIGEST -- Output a hex signature representing the current DB content.",
|
||||||
|
"DIGEST-VALUE <key-1> ... <key-N>-- Output a hex signature of the values of all the specified keys.",
|
||||||
"ERROR <string> -- Return a Redis protocol error with <string> as message. Useful for clients unit tests to simulate Redis errors.",
|
"ERROR <string> -- Return a Redis protocol error with <string> as message. Useful for clients unit tests to simulate Redis errors.",
|
||||||
"LOG <message> -- write message to the server log.",
|
"LOG <message> -- write message to the server log.",
|
||||||
"HTSTATS <dbid> -- Return hash table statistics of the specified Redis database.",
|
"HTSTATS <dbid> -- Return hash table statistics of the specified Redis database.",
|
||||||
|
@ -310,6 +322,7 @@ void debugCommand(client *c) {
|
||||||
"SLEEP <seconds> -- Stop the server for <seconds>. Decimals allowed.",
|
"SLEEP <seconds> -- Stop the server for <seconds>. Decimals allowed.",
|
||||||
"STRUCTSIZE -- Return the size of different Redis core C structures.",
|
"STRUCTSIZE -- Return the size of different Redis core C structures.",
|
||||||
"ZIPLIST <key> -- Show low level info about the ziplist encoding.",
|
"ZIPLIST <key> -- Show low level info about the ziplist encoding.",
|
||||||
|
"STRINGMATCH-TEST -- Run a fuzz tester against the stringmatchlen() function.",
|
||||||
NULL
|
NULL
|
||||||
};
|
};
|
||||||
addReplyHelp(c, help);
|
addReplyHelp(c, help);
|
||||||
|
@ -336,7 +349,6 @@ NULL
|
||||||
zfree(ptr);
|
zfree(ptr);
|
||||||
addReply(c,shared.ok);
|
addReply(c,shared.ok);
|
||||||
} else if (!strcasecmp(c->argv[1]->ptr,"assert")) {
|
} else if (!strcasecmp(c->argv[1]->ptr,"assert")) {
|
||||||
if (c->argc >= 3) c->argv[2] = tryObjectEncoding(c->argv[2]);
|
|
||||||
serverAssertWithInfo(c,c->argv[0],1 == 2);
|
serverAssertWithInfo(c,c->argv[0],1 == 2);
|
||||||
} else if (!strcasecmp(c->argv[1]->ptr,"log") && c->argc == 3) {
|
} else if (!strcasecmp(c->argv[1]->ptr,"log") && c->argc == 3) {
|
||||||
serverLog(LL_WARNING, "DEBUG LOG: %s", (char*)c->argv[2]->ptr);
|
serverLog(LL_WARNING, "DEBUG LOG: %s", (char*)c->argv[2]->ptr);
|
||||||
|
@ -495,15 +507,28 @@ NULL
|
||||||
}
|
}
|
||||||
addReply(c,shared.ok);
|
addReply(c,shared.ok);
|
||||||
} else if (!strcasecmp(c->argv[1]->ptr,"digest") && c->argc == 2) {
|
} else if (!strcasecmp(c->argv[1]->ptr,"digest") && c->argc == 2) {
|
||||||
|
/* DEBUG DIGEST (form without keys specified) */
|
||||||
unsigned char digest[20];
|
unsigned char digest[20];
|
||||||
sds d = sdsempty();
|
sds d = sdsempty();
|
||||||
int j;
|
|
||||||
|
|
||||||
computeDatasetDigest(digest);
|
computeDatasetDigest(digest);
|
||||||
for (j = 0; j < 20; j++)
|
for (int i = 0; i < 20; i++) d = sdscatprintf(d, "%02x",digest[i]);
|
||||||
d = sdscatprintf(d, "%02x",digest[j]);
|
|
||||||
addReplyStatus(c,d);
|
addReplyStatus(c,d);
|
||||||
sdsfree(d);
|
sdsfree(d);
|
||||||
|
} else if (!strcasecmp(c->argv[1]->ptr,"digest-value") && c->argc >= 2) {
|
||||||
|
/* DEBUG DIGEST-VALUE key key key ... key. */
|
||||||
|
addReplyMultiBulkLen(c,c->argc-2);
|
||||||
|
for (int j = 2; j < c->argc; j++) {
|
||||||
|
unsigned char digest[20];
|
||||||
|
memset(digest,0,20); /* Start with a clean result */
|
||||||
|
robj *o = lookupKeyReadWithFlags(c->db,c->argv[j],LOOKUP_NOTOUCH);
|
||||||
|
if (o) xorObjectDigest(c->db,c->argv[j],digest,o);
|
||||||
|
|
||||||
|
sds d = sdsempty();
|
||||||
|
for (int i = 0; i < 20; i++) d = sdscatprintf(d, "%02x",digest[i]);
|
||||||
|
addReplyStatus(c,d);
|
||||||
|
sdsfree(d);
|
||||||
|
}
|
||||||
} else if (!strcasecmp(c->argv[1]->ptr,"sleep") && c->argc == 3) {
|
} else if (!strcasecmp(c->argv[1]->ptr,"sleep") && c->argc == 3) {
|
||||||
double dtime = strtod(c->argv[2]->ptr,NULL);
|
double dtime = strtod(c->argv[2]->ptr,NULL);
|
||||||
long long utime = dtime*1000000;
|
long long utime = dtime*1000000;
|
||||||
|
@ -595,6 +620,10 @@ NULL
|
||||||
changeReplicationId();
|
changeReplicationId();
|
||||||
clearReplicationId2();
|
clearReplicationId2();
|
||||||
addReply(c,shared.ok);
|
addReply(c,shared.ok);
|
||||||
|
} else if (!strcasecmp(c->argv[1]->ptr,"stringmatch-test") && c->argc == 2)
|
||||||
|
{
|
||||||
|
stringmatchlen_fuzz_test();
|
||||||
|
addReplyStatus(c,"Apparently Redis did not crash: test passed");
|
||||||
} else {
|
} else {
|
||||||
addReplySubcommandSyntaxError(c);
|
addReplySubcommandSyntaxError(c);
|
||||||
return;
|
return;
|
||||||
|
|
15
src/evict.c
15
src/evict.c
|
@ -444,8 +444,8 @@ int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *lev
|
||||||
* Otehrwise if we are over the memory limit, but not enough memory
|
* Otehrwise if we are over the memory limit, but not enough memory
|
||||||
* was freed to return back under the limit, the function returns C_ERR. */
|
* was freed to return back under the limit, the function returns C_ERR. */
|
||||||
int freeMemoryIfNeeded(void) {
|
int freeMemoryIfNeeded(void) {
|
||||||
/* By default slaves should ignore maxmemory and just be masters excat
|
/* By default replicas should ignore maxmemory
|
||||||
* copies. */
|
* and just be masters exact copies. */
|
||||||
if (server.masterhost && server.repl_slave_ignore_maxmemory) return C_OK;
|
if (server.masterhost && server.repl_slave_ignore_maxmemory) return C_OK;
|
||||||
|
|
||||||
size_t mem_reported, mem_tofree, mem_freed;
|
size_t mem_reported, mem_tofree, mem_freed;
|
||||||
|
@ -622,3 +622,14 @@ cant_free:
|
||||||
return C_ERR;
|
return C_ERR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* This is a wrapper for freeMemoryIfNeeded() that only really calls the
|
||||||
|
* function if right now there are the conditions to do so safely:
|
||||||
|
*
|
||||||
|
* - There must be no script in timeout condition.
|
||||||
|
* - Nor we are loading data right now.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
int freeMemoryIfNeededAndSafe(void) {
|
||||||
|
if (server.lua_timedout || server.loading) return C_OK;
|
||||||
|
return freeMemoryIfNeeded();
|
||||||
|
}
|
||||||
|
|
17
src/multi.c
17
src/multi.c
|
@ -35,6 +35,7 @@
|
||||||
void initClientMultiState(client *c) {
|
void initClientMultiState(client *c) {
|
||||||
c->mstate.commands = NULL;
|
c->mstate.commands = NULL;
|
||||||
c->mstate.count = 0;
|
c->mstate.count = 0;
|
||||||
|
c->mstate.cmd_flags = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Release all the resources associated with MULTI/EXEC state */
|
/* Release all the resources associated with MULTI/EXEC state */
|
||||||
|
@ -67,6 +68,7 @@ void queueMultiCommand(client *c) {
|
||||||
for (j = 0; j < c->argc; j++)
|
for (j = 0; j < c->argc; j++)
|
||||||
incrRefCount(mc->argv[j]);
|
incrRefCount(mc->argv[j]);
|
||||||
c->mstate.count++;
|
c->mstate.count++;
|
||||||
|
c->mstate.cmd_flags |= c->cmd->flags;
|
||||||
}
|
}
|
||||||
|
|
||||||
void discardTransaction(client *c) {
|
void discardTransaction(client *c) {
|
||||||
|
@ -137,6 +139,21 @@ void execCommand(client *c) {
|
||||||
goto handle_monitor;
|
goto handle_monitor;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* If there are write commands inside the transaction, and this is a read
|
||||||
|
* only slave, we want to send an error. This happens when the transaction
|
||||||
|
* was initiated when the instance was a master or a writable replica and
|
||||||
|
* then the configuration changed (for example instance was turned into
|
||||||
|
* a replica). */
|
||||||
|
if (!server.loading && server.masterhost && server.repl_slave_ro &&
|
||||||
|
!(c->flags & CLIENT_MASTER) && c->mstate.cmd_flags & CMD_WRITE)
|
||||||
|
{
|
||||||
|
addReplyError(c,
|
||||||
|
"Transaction contains write commands but instance "
|
||||||
|
"is now a read-only replica. EXEC aborted.");
|
||||||
|
discardTransaction(c);
|
||||||
|
goto handle_monitor;
|
||||||
|
}
|
||||||
|
|
||||||
/* Exec all the queued commands */
|
/* Exec all the queued commands */
|
||||||
unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
|
unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
|
||||||
orig_argv = c->argv;
|
orig_argv = c->argv;
|
||||||
|
|
|
@ -365,19 +365,13 @@ void addReplyErrorLength(client *c, const char *s, size_t len) {
|
||||||
* Where the master must propagate the first change even if the second
|
* Where the master must propagate the first change even if the second
|
||||||
* will produce an error. However it is useful to log such events since
|
* will produce an error. However it is useful to log such events since
|
||||||
* they are rare and may hint at errors in a script or a bug in Redis. */
|
* they are rare and may hint at errors in a script or a bug in Redis. */
|
||||||
if (c->flags & (CLIENT_MASTER|CLIENT_SLAVE)) {
|
if (c->flags & (CLIENT_MASTER|CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR)) {
|
||||||
char* to = c->flags & CLIENT_MASTER? "master": "replica";
|
char* to = c->flags & CLIENT_MASTER? "master": "replica";
|
||||||
char* from = c->flags & CLIENT_MASTER? "replica": "master";
|
char* from = c->flags & CLIENT_MASTER? "replica": "master";
|
||||||
char *cmdname = c->lastcmd ? c->lastcmd->name : "<unknown>";
|
char *cmdname = c->lastcmd ? c->lastcmd->name : "<unknown>";
|
||||||
serverLog(LL_WARNING,"== CRITICAL == This %s is sending an error "
|
serverLog(LL_WARNING,"== CRITICAL == This %s is sending an error "
|
||||||
"to its %s: '%s' after processing the command "
|
"to its %s: '%s' after processing the command "
|
||||||
"'%s'", from, to, s, cmdname);
|
"'%s'", from, to, s, cmdname);
|
||||||
/* Here we want to panic because when a master is sending an
|
|
||||||
* error to some slave in the context of replication, this can
|
|
||||||
* only create some kind of offset or data desynchronization. Better
|
|
||||||
* to catch it ASAP and crash instead of continuing. */
|
|
||||||
if (c->flags & CLIENT_SLAVE)
|
|
||||||
serverPanic("Continuing is unsafe: replication protocol violation.");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1470,7 +1464,7 @@ void processInputBuffer(client *c) {
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Trim to pos */
|
/* Trim to pos */
|
||||||
if (c->qb_pos) {
|
if (server.current_client != NULL && c->qb_pos) {
|
||||||
sdsrange(c->querybuf,c->qb_pos,-1);
|
sdsrange(c->querybuf,c->qb_pos,-1);
|
||||||
c->qb_pos = 0;
|
c->qb_pos = 0;
|
||||||
}
|
}
|
||||||
|
|
672
src/redis-cli.c
672
src/redis-cli.c
|
@ -67,6 +67,7 @@
|
||||||
#define REDIS_CLI_HISTFILE_DEFAULT ".rediscli_history"
|
#define REDIS_CLI_HISTFILE_DEFAULT ".rediscli_history"
|
||||||
#define REDIS_CLI_RCFILE_ENV "REDISCLI_RCFILE"
|
#define REDIS_CLI_RCFILE_ENV "REDISCLI_RCFILE"
|
||||||
#define REDIS_CLI_RCFILE_DEFAULT ".redisclirc"
|
#define REDIS_CLI_RCFILE_DEFAULT ".redisclirc"
|
||||||
|
#define REDIS_CLI_AUTH_ENV "REDISCLI_AUTH"
|
||||||
|
|
||||||
#define CLUSTER_MANAGER_SLOTS 16384
|
#define CLUSTER_MANAGER_SLOTS 16384
|
||||||
#define CLUSTER_MANAGER_MIGRATE_TIMEOUT 60000
|
#define CLUSTER_MANAGER_MIGRATE_TIMEOUT 60000
|
||||||
|
@ -116,6 +117,7 @@
|
||||||
#define CLUSTER_MANAGER_CMD_FLAG_REPLACE 1 << 6
|
#define CLUSTER_MANAGER_CMD_FLAG_REPLACE 1 << 6
|
||||||
#define CLUSTER_MANAGER_CMD_FLAG_COPY 1 << 7
|
#define CLUSTER_MANAGER_CMD_FLAG_COPY 1 << 7
|
||||||
#define CLUSTER_MANAGER_CMD_FLAG_COLOR 1 << 8
|
#define CLUSTER_MANAGER_CMD_FLAG_COLOR 1 << 8
|
||||||
|
#define CLUSTER_MANAGER_CMD_FLAG_CHECK_OWNERS 1 << 9
|
||||||
|
|
||||||
#define CLUSTER_MANAGER_OPT_GETFRIENDS 1 << 0
|
#define CLUSTER_MANAGER_OPT_GETFRIENDS 1 << 0
|
||||||
#define CLUSTER_MANAGER_OPT_COLD 1 << 1
|
#define CLUSTER_MANAGER_OPT_COLD 1 << 1
|
||||||
|
@ -1377,6 +1379,9 @@ static int parseOptions(int argc, char **argv) {
|
||||||
} else if (!strcmp(argv[i],"--cluster-use-empty-masters")) {
|
} else if (!strcmp(argv[i],"--cluster-use-empty-masters")) {
|
||||||
config.cluster_manager_command.flags |=
|
config.cluster_manager_command.flags |=
|
||||||
CLUSTER_MANAGER_CMD_FLAG_EMPTYMASTER;
|
CLUSTER_MANAGER_CMD_FLAG_EMPTYMASTER;
|
||||||
|
} else if (!strcmp(argv[i],"--cluster-search-multiple-owners")) {
|
||||||
|
config.cluster_manager_command.flags |=
|
||||||
|
CLUSTER_MANAGER_CMD_FLAG_CHECK_OWNERS;
|
||||||
} else if (!strcmp(argv[i],"-v") || !strcmp(argv[i], "--version")) {
|
} else if (!strcmp(argv[i],"-v") || !strcmp(argv[i], "--version")) {
|
||||||
sds version = cliVersion();
|
sds version = cliVersion();
|
||||||
printf("redis-cli %s\n", version);
|
printf("redis-cli %s\n", version);
|
||||||
|
@ -1419,6 +1424,14 @@ static int parseOptions(int argc, char **argv) {
|
||||||
return i;
|
return i;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void parseEnv() {
|
||||||
|
/* Set auth from env, but do not overwrite CLI arguments if passed */
|
||||||
|
char *auth = getenv(REDIS_CLI_AUTH_ENV);
|
||||||
|
if (auth != NULL && config.auth == NULL) {
|
||||||
|
config.auth = auth;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static sds readArgFromStdin(void) {
|
static sds readArgFromStdin(void) {
|
||||||
char buf[1024];
|
char buf[1024];
|
||||||
sds arg = sdsempty();
|
sds arg = sdsempty();
|
||||||
|
@ -1446,6 +1459,9 @@ static void usage(void) {
|
||||||
" -p <port> Server port (default: 6379).\n"
|
" -p <port> Server port (default: 6379).\n"
|
||||||
" -s <socket> Server socket (overrides hostname and port).\n"
|
" -s <socket> Server socket (overrides hostname and port).\n"
|
||||||
" -a <password> Password to use when connecting to the server.\n"
|
" -a <password> Password to use when connecting to the server.\n"
|
||||||
|
" You can also use the " REDIS_CLI_AUTH_ENV " environment\n"
|
||||||
|
" variable to pass this password more safely\n"
|
||||||
|
" (if both are used, this argument takes predecence).\n"
|
||||||
" -u <uri> Server URI.\n"
|
" -u <uri> Server URI.\n"
|
||||||
" -r <repeat> Execute specified command N times.\n"
|
" -r <repeat> Execute specified command N times.\n"
|
||||||
" -i <interval> When -r is used, waits <interval> seconds per command.\n"
|
" -i <interval> When -r is used, waits <interval> seconds per command.\n"
|
||||||
|
@ -1834,7 +1850,7 @@ static int evalMode(int argc, char **argv) {
|
||||||
if (eval_ldb) {
|
if (eval_ldb) {
|
||||||
if (!config.eval_ldb) {
|
if (!config.eval_ldb) {
|
||||||
/* If the debugging session ended immediately, there was an
|
/* If the debugging session ended immediately, there was an
|
||||||
* error compiling the script. Show it and don't enter
|
* error compiling the script. Show it and they don't enter
|
||||||
* the REPL at all. */
|
* the REPL at all. */
|
||||||
printf("Eval debugging session can't start:\n");
|
printf("Eval debugging session can't start:\n");
|
||||||
cliReadReply(0);
|
cliReadReply(0);
|
||||||
|
@ -1917,6 +1933,8 @@ static dictType clusterManagerDictType = {
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef int clusterManagerCommandProc(int argc, char **argv);
|
typedef int clusterManagerCommandProc(int argc, char **argv);
|
||||||
|
typedef int (*clusterManagerOnReplyError)(redisReply *reply,
|
||||||
|
clusterManagerNode *n, int bulk_idx);
|
||||||
|
|
||||||
/* Cluster Manager helper functions */
|
/* Cluster Manager helper functions */
|
||||||
|
|
||||||
|
@ -1978,14 +1996,17 @@ typedef struct clusterManagerCommandDef {
|
||||||
clusterManagerCommandDef clusterManagerCommands[] = {
|
clusterManagerCommandDef clusterManagerCommands[] = {
|
||||||
{"create", clusterManagerCommandCreate, -2, "host1:port1 ... hostN:portN",
|
{"create", clusterManagerCommandCreate, -2, "host1:port1 ... hostN:portN",
|
||||||
"replicas <arg>"},
|
"replicas <arg>"},
|
||||||
{"check", clusterManagerCommandCheck, -1, "host:port", NULL},
|
{"check", clusterManagerCommandCheck, -1, "host:port",
|
||||||
|
"search-multiple-owners"},
|
||||||
{"info", clusterManagerCommandInfo, -1, "host:port", NULL},
|
{"info", clusterManagerCommandInfo, -1, "host:port", NULL},
|
||||||
{"fix", clusterManagerCommandFix, -1, "host:port", NULL},
|
{"fix", clusterManagerCommandFix, -1, "host:port",
|
||||||
|
"search-multiple-owners"},
|
||||||
{"reshard", clusterManagerCommandReshard, -1, "host:port",
|
{"reshard", clusterManagerCommandReshard, -1, "host:port",
|
||||||
"from <arg>,to <arg>,slots <arg>,yes,timeout <arg>,pipeline <arg>"},
|
"from <arg>,to <arg>,slots <arg>,yes,timeout <arg>,pipeline <arg>,"
|
||||||
|
"replace"},
|
||||||
{"rebalance", clusterManagerCommandRebalance, -1, "host:port",
|
{"rebalance", clusterManagerCommandRebalance, -1, "host:port",
|
||||||
"weight <node1=w1...nodeN=wN>,use-empty-masters,"
|
"weight <node1=w1...nodeN=wN>,use-empty-masters,"
|
||||||
"timeout <arg>,simulate,pipeline <arg>,threshold <arg>"},
|
"timeout <arg>,simulate,pipeline <arg>,threshold <arg>,replace"},
|
||||||
{"add-node", clusterManagerCommandAddNode, 2,
|
{"add-node", clusterManagerCommandAddNode, 2,
|
||||||
"new_host:new_port existing_host:existing_port", "slave,master-id <arg>"},
|
"new_host:new_port existing_host:existing_port", "slave,master-id <arg>"},
|
||||||
{"del-node", clusterManagerCommandDeleteNode, 2, "host:port node_id",NULL},
|
{"del-node", clusterManagerCommandDeleteNode, 2, "host:port node_id",NULL},
|
||||||
|
@ -2176,6 +2197,44 @@ static int clusterManagerCheckRedisReply(clusterManagerNode *n,
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Call MULTI command on a cluster node. */
|
||||||
|
static int clusterManagerStartTransaction(clusterManagerNode *node) {
|
||||||
|
redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "MULTI");
|
||||||
|
int success = clusterManagerCheckRedisReply(node, reply, NULL);
|
||||||
|
if (reply) freeReplyObject(reply);
|
||||||
|
return success;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Call EXEC command on a cluster node. */
|
||||||
|
static int clusterManagerExecTransaction(clusterManagerNode *node,
|
||||||
|
clusterManagerOnReplyError onerror)
|
||||||
|
{
|
||||||
|
redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "EXEC");
|
||||||
|
int success = clusterManagerCheckRedisReply(node, reply, NULL);
|
||||||
|
if (success) {
|
||||||
|
if (reply->type != REDIS_REPLY_ARRAY) {
|
||||||
|
success = 0;
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
size_t i;
|
||||||
|
for (i = 0; i < reply->elements; i++) {
|
||||||
|
redisReply *r = reply->element[i];
|
||||||
|
char *err = NULL;
|
||||||
|
success = clusterManagerCheckRedisReply(node, r, &err);
|
||||||
|
if (!success && onerror) success = onerror(r, node, i);
|
||||||
|
if (err) {
|
||||||
|
if (!success)
|
||||||
|
CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err);
|
||||||
|
zfree(err);
|
||||||
|
}
|
||||||
|
if (!success) break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
cleanup:
|
||||||
|
if (reply) freeReplyObject(reply);
|
||||||
|
return success;
|
||||||
|
}
|
||||||
|
|
||||||
static int clusterManagerNodeConnect(clusterManagerNode *node) {
|
static int clusterManagerNodeConnect(clusterManagerNode *node) {
|
||||||
if (node->context) redisFree(node->context);
|
if (node->context) redisFree(node->context);
|
||||||
node->context = redisConnect(node->ip, node->port);
|
node->context = redisConnect(node->ip, node->port);
|
||||||
|
@ -2710,6 +2769,55 @@ cleanup:
|
||||||
return success;
|
return success;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Get the node the slot is assigned to from the point of view of node *n.
|
||||||
|
* If the slot is unassigned or if the reply is an error, return NULL.
|
||||||
|
* Use the **err argument in order to check wether the slot is unassigned
|
||||||
|
* or the reply resulted in an error. */
|
||||||
|
static clusterManagerNode *clusterManagerGetSlotOwner(clusterManagerNode *n,
|
||||||
|
int slot, char **err)
|
||||||
|
{
|
||||||
|
assert(slot >= 0 && slot < CLUSTER_MANAGER_SLOTS);
|
||||||
|
clusterManagerNode *owner = NULL;
|
||||||
|
redisReply *reply = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SLOTS");
|
||||||
|
if (clusterManagerCheckRedisReply(n, reply, err)) {
|
||||||
|
assert(reply->type == REDIS_REPLY_ARRAY);
|
||||||
|
size_t i;
|
||||||
|
for (i = 0; i < reply->elements; i++) {
|
||||||
|
redisReply *r = reply->element[i];
|
||||||
|
assert(r->type == REDIS_REPLY_ARRAY && r->elements >= 3);
|
||||||
|
int from, to;
|
||||||
|
from = r->element[0]->integer;
|
||||||
|
to = r->element[1]->integer;
|
||||||
|
if (slot < from || slot > to) continue;
|
||||||
|
redisReply *nr = r->element[2];
|
||||||
|
assert(nr->type == REDIS_REPLY_ARRAY && nr->elements >= 2);
|
||||||
|
char *name = NULL;
|
||||||
|
if (nr->elements >= 3)
|
||||||
|
name = nr->element[2]->str;
|
||||||
|
if (name != NULL)
|
||||||
|
owner = clusterManagerNodeByName(name);
|
||||||
|
else {
|
||||||
|
char *ip = nr->element[0]->str;
|
||||||
|
assert(ip != NULL);
|
||||||
|
int port = (int) nr->element[1]->integer;
|
||||||
|
listIter li;
|
||||||
|
listNode *ln;
|
||||||
|
listRewind(cluster_manager.nodes, &li);
|
||||||
|
while ((ln = listNext(&li)) != NULL) {
|
||||||
|
clusterManagerNode *nd = ln->value;
|
||||||
|
if (strcmp(nd->ip, ip) == 0 && port == nd->port) {
|
||||||
|
owner = nd;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (owner) break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (reply) freeReplyObject(reply);
|
||||||
|
return owner;
|
||||||
|
}
|
||||||
|
|
||||||
/* Set slot status to "importing" or "migrating" */
|
/* Set slot status to "importing" or "migrating" */
|
||||||
static int clusterManagerSetSlot(clusterManagerNode *node1,
|
static int clusterManagerSetSlot(clusterManagerNode *node1,
|
||||||
clusterManagerNode *node2,
|
clusterManagerNode *node2,
|
||||||
|
@ -2734,6 +2842,162 @@ cleanup:
|
||||||
return success;
|
return success;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int clusterManagerClearSlotStatus(clusterManagerNode *node, int slot) {
|
||||||
|
redisReply *reply = CLUSTER_MANAGER_COMMAND(node,
|
||||||
|
"CLUSTER SETSLOT %d %s", slot, "STABLE");
|
||||||
|
int success = clusterManagerCheckRedisReply(node, reply, NULL);
|
||||||
|
if (reply) freeReplyObject(reply);
|
||||||
|
return success;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int clusterManagerDelSlot(clusterManagerNode *node, int slot,
|
||||||
|
int ignore_unassigned_err)
|
||||||
|
{
|
||||||
|
redisReply *reply = CLUSTER_MANAGER_COMMAND(node,
|
||||||
|
"CLUSTER DELSLOTS %d", slot);
|
||||||
|
char *err = NULL;
|
||||||
|
int success = clusterManagerCheckRedisReply(node, reply, &err);
|
||||||
|
if (!success && reply && reply->type == REDIS_REPLY_ERROR &&
|
||||||
|
ignore_unassigned_err)
|
||||||
|
{
|
||||||
|
char *get_owner_err = NULL;
|
||||||
|
clusterManagerNode *assigned_to =
|
||||||
|
clusterManagerGetSlotOwner(node, slot, &get_owner_err);
|
||||||
|
if (!assigned_to) {
|
||||||
|
if (get_owner_err == NULL) success = 1;
|
||||||
|
else {
|
||||||
|
CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, get_owner_err);
|
||||||
|
zfree(get_owner_err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!success && err != NULL) {
|
||||||
|
CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err);
|
||||||
|
zfree(err);
|
||||||
|
}
|
||||||
|
if (reply) freeReplyObject(reply);
|
||||||
|
return success;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int clusterManagerAddSlot(clusterManagerNode *node, int slot) {
|
||||||
|
redisReply *reply = CLUSTER_MANAGER_COMMAND(node,
|
||||||
|
"CLUSTER ADDSLOTS %d", slot);
|
||||||
|
int success = clusterManagerCheckRedisReply(node, reply, NULL);
|
||||||
|
if (reply) freeReplyObject(reply);
|
||||||
|
return success;
|
||||||
|
}
|
||||||
|
|
||||||
|
static signed int clusterManagerCountKeysInSlot(clusterManagerNode *node,
|
||||||
|
int slot)
|
||||||
|
{
|
||||||
|
redisReply *reply = CLUSTER_MANAGER_COMMAND(node,
|
||||||
|
"CLUSTER COUNTKEYSINSLOT %d", slot);
|
||||||
|
int count = -1;
|
||||||
|
int success = clusterManagerCheckRedisReply(node, reply, NULL);
|
||||||
|
if (success && reply->type == REDIS_REPLY_INTEGER) count = reply->integer;
|
||||||
|
if (reply) freeReplyObject(reply);
|
||||||
|
return count;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int clusterManagerBumpEpoch(clusterManagerNode *node) {
|
||||||
|
redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "CLUSTER BUMPEPOCH");
|
||||||
|
int success = clusterManagerCheckRedisReply(node, reply, NULL);
|
||||||
|
if (reply) freeReplyObject(reply);
|
||||||
|
return success;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Callback used by clusterManagerSetSlotOwner transaction. It should ignore
|
||||||
|
* errors except for ADDSLOTS errors.
|
||||||
|
* Return 1 if the error should be ignored. */
|
||||||
|
static int clusterManagerOnSetOwnerErr(redisReply *reply,
|
||||||
|
clusterManagerNode *n, int bulk_idx)
|
||||||
|
{
|
||||||
|
UNUSED(reply);
|
||||||
|
UNUSED(n);
|
||||||
|
/* Only raise error when ADDSLOTS fail (bulk_idx == 1). */
|
||||||
|
return (bulk_idx != 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int clusterManagerSetSlotOwner(clusterManagerNode *owner,
|
||||||
|
int slot,
|
||||||
|
int do_clear)
|
||||||
|
{
|
||||||
|
int success = clusterManagerStartTransaction(owner);
|
||||||
|
if (!success) return 0;
|
||||||
|
/* Ensure the slot is not already assigned. */
|
||||||
|
clusterManagerDelSlot(owner, slot, 1);
|
||||||
|
/* Add the slot and bump epoch. */
|
||||||
|
clusterManagerAddSlot(owner, slot);
|
||||||
|
if (do_clear) clusterManagerClearSlotStatus(owner, slot);
|
||||||
|
clusterManagerBumpEpoch(owner);
|
||||||
|
success = clusterManagerExecTransaction(owner, clusterManagerOnSetOwnerErr);
|
||||||
|
return success;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Get the hash for the values of the specified keys in *keys_reply for the
|
||||||
|
* specified nodes *n1 and *n2, by calling DEBUG DIGEST-VALUE redis command
|
||||||
|
* on both nodes. Every key with same name on both nodes but having different
|
||||||
|
* values will be added to the *diffs list. Return 0 in case of reply
|
||||||
|
* error. */
|
||||||
|
static int clusterManagerCompareKeysValues(clusterManagerNode *n1,
|
||||||
|
clusterManagerNode *n2,
|
||||||
|
redisReply *keys_reply,
|
||||||
|
list *diffs)
|
||||||
|
{
|
||||||
|
size_t i, argc = keys_reply->elements + 2;
|
||||||
|
static const char *hash_zero = "0000000000000000000000000000000000000000";
|
||||||
|
char **argv = zcalloc(argc * sizeof(char *));
|
||||||
|
size_t *argv_len = zcalloc(argc * sizeof(size_t));
|
||||||
|
argv[0] = "DEBUG";
|
||||||
|
argv_len[0] = 5;
|
||||||
|
argv[1] = "DIGEST-VALUE";
|
||||||
|
argv_len[1] = 12;
|
||||||
|
for (i = 0; i < keys_reply->elements; i++) {
|
||||||
|
redisReply *entry = keys_reply->element[i];
|
||||||
|
int idx = i + 2;
|
||||||
|
argv[idx] = entry->str;
|
||||||
|
argv_len[idx] = entry->len;
|
||||||
|
}
|
||||||
|
int success = 0;
|
||||||
|
void *_reply1 = NULL, *_reply2 = NULL;
|
||||||
|
redisReply *r1 = NULL, *r2 = NULL;
|
||||||
|
redisAppendCommandArgv(n1->context,argc, (const char**)argv,argv_len);
|
||||||
|
success = (redisGetReply(n1->context, &_reply1) == REDIS_OK);
|
||||||
|
if (!success) goto cleanup;
|
||||||
|
r1 = (redisReply *) _reply1;
|
||||||
|
redisAppendCommandArgv(n2->context,argc, (const char**)argv,argv_len);
|
||||||
|
success = (redisGetReply(n2->context, &_reply2) == REDIS_OK);
|
||||||
|
if (!success) goto cleanup;
|
||||||
|
r2 = (redisReply *) _reply2;
|
||||||
|
success = (r1->type != REDIS_REPLY_ERROR && r2->type != REDIS_REPLY_ERROR);
|
||||||
|
if (r1->type == REDIS_REPLY_ERROR) {
|
||||||
|
CLUSTER_MANAGER_PRINT_REPLY_ERROR(n1, r1->str);
|
||||||
|
success = 0;
|
||||||
|
}
|
||||||
|
if (r2->type == REDIS_REPLY_ERROR) {
|
||||||
|
CLUSTER_MANAGER_PRINT_REPLY_ERROR(n2, r2->str);
|
||||||
|
success = 0;
|
||||||
|
}
|
||||||
|
if (!success) goto cleanup;
|
||||||
|
assert(keys_reply->elements == r1->elements &&
|
||||||
|
keys_reply->elements == r2->elements);
|
||||||
|
for (i = 0; i < keys_reply->elements; i++) {
|
||||||
|
char *key = keys_reply->element[i]->str;
|
||||||
|
char *hash1 = r1->element[i]->str;
|
||||||
|
char *hash2 = r2->element[i]->str;
|
||||||
|
/* Ignore keys that don't exist in both nodes. */
|
||||||
|
if (strcmp(hash1, hash_zero) == 0 || strcmp(hash2, hash_zero) == 0)
|
||||||
|
continue;
|
||||||
|
if (strcmp(hash1, hash2) != 0) listAddNodeTail(diffs, key);
|
||||||
|
}
|
||||||
|
cleanup:
|
||||||
|
if (r1) freeReplyObject(r1);
|
||||||
|
if (r2) freeReplyObject(r2);
|
||||||
|
zfree(argv);
|
||||||
|
zfree(argv_len);
|
||||||
|
return success;
|
||||||
|
}
|
||||||
|
|
||||||
/* Migrate keys taken from reply->elements. It returns the reply from the
|
/* Migrate keys taken from reply->elements. It returns the reply from the
|
||||||
* MIGRATE command, or NULL if something goes wrong. If the argument 'dots'
|
* MIGRATE command, or NULL if something goes wrong. If the argument 'dots'
|
||||||
* is not NULL, a dot will be printed for every migrated key. */
|
* is not NULL, a dot will be printed for every migrated key. */
|
||||||
|
@ -2814,8 +3078,10 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source,
|
||||||
char **err)
|
char **err)
|
||||||
{
|
{
|
||||||
int success = 1;
|
int success = 1;
|
||||||
int do_fix = (config.cluster_manager_command.flags &
|
int do_fix = config.cluster_manager_command.flags &
|
||||||
CLUSTER_MANAGER_CMD_FLAG_FIX);
|
CLUSTER_MANAGER_CMD_FLAG_FIX;
|
||||||
|
int do_replace = config.cluster_manager_command.flags &
|
||||||
|
CLUSTER_MANAGER_CMD_FLAG_REPLACE;
|
||||||
while (1) {
|
while (1) {
|
||||||
char *dots = NULL;
|
char *dots = NULL;
|
||||||
redisReply *reply = NULL, *migrate_reply = NULL;
|
redisReply *reply = NULL, *migrate_reply = NULL;
|
||||||
|
@ -2846,16 +3112,86 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source,
|
||||||
dots);
|
dots);
|
||||||
if (migrate_reply == NULL) goto next;
|
if (migrate_reply == NULL) goto next;
|
||||||
if (migrate_reply->type == REDIS_REPLY_ERROR) {
|
if (migrate_reply->type == REDIS_REPLY_ERROR) {
|
||||||
if (do_fix && strstr(migrate_reply->str, "BUSYKEY")) {
|
int is_busy = strstr(migrate_reply->str, "BUSYKEY") != NULL;
|
||||||
/* If the key already exists, try to migrate keys
|
int not_served = 0;
|
||||||
* adding REPLACE option.
|
if (!is_busy) {
|
||||||
* If the key's slot is not served, try to assign slot
|
/* Check if the slot is unassigned (not served) in the
|
||||||
|
* source node's configuration. */
|
||||||
|
char *get_owner_err = NULL;
|
||||||
|
clusterManagerNode *served_by =
|
||||||
|
clusterManagerGetSlotOwner(source, slot, &get_owner_err);
|
||||||
|
if (!served_by) {
|
||||||
|
if (get_owner_err == NULL) not_served = 1;
|
||||||
|
else {
|
||||||
|
CLUSTER_MANAGER_PRINT_REPLY_ERROR(source,
|
||||||
|
get_owner_err);
|
||||||
|
zfree(get_owner_err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/* Try to handle errors. */
|
||||||
|
if (is_busy || not_served) {
|
||||||
|
/* If the key's slot is not served, try to assign slot
|
||||||
* to the target node. */
|
* to the target node. */
|
||||||
int is_busy = (strstr(migrate_reply->str, "BUSYKEY") != NULL);
|
if (do_fix && not_served) {
|
||||||
if (strstr(migrate_reply->str, "slot not served") != NULL)
|
clusterManagerLogWarn("*** Slot was not served, setting "
|
||||||
|
"owner to node %s:%d.\n",
|
||||||
|
target->ip, target->port);
|
||||||
clusterManagerSetSlot(source, target, slot, "node", NULL);
|
clusterManagerSetSlot(source, target, slot, "node", NULL);
|
||||||
clusterManagerLogWarn("*** Target key exists. "
|
}
|
||||||
"Replacing it for FIX.\n");
|
/* If the key already exists in the target node (BUSYKEY),
|
||||||
|
* check whether its value is the same in both nodes.
|
||||||
|
* In case of equal values, retry migration with the
|
||||||
|
* REPLACE option.
|
||||||
|
* In case of different values:
|
||||||
|
* - If the migration is requested by the fix command, stop
|
||||||
|
* and warn the user.
|
||||||
|
* - In other cases (ie. reshard), proceed only if the user
|
||||||
|
* launched the command with the --cluster-replace option.*/
|
||||||
|
if (is_busy) {
|
||||||
|
clusterManagerLogWarn("\n*** Target key exists\n");
|
||||||
|
if (!do_replace) {
|
||||||
|
clusterManagerLogWarn("*** Checking key values on "
|
||||||
|
"both nodes...\n");
|
||||||
|
list *diffs = listCreate();
|
||||||
|
success = clusterManagerCompareKeysValues(source,
|
||||||
|
target, reply, diffs);
|
||||||
|
if (!success) {
|
||||||
|
clusterManagerLogErr("*** Value check failed!\n");
|
||||||
|
listRelease(diffs);
|
||||||
|
goto next;
|
||||||
|
}
|
||||||
|
if (listLength(diffs) > 0) {
|
||||||
|
success = 0;
|
||||||
|
clusterManagerLogErr(
|
||||||
|
"*** Found %d key(s) in both source node and "
|
||||||
|
"target node having different values.\n"
|
||||||
|
" Source node: %s:%d\n"
|
||||||
|
" Target node: %s:%d\n"
|
||||||
|
" Keys(s):\n",
|
||||||
|
listLength(diffs),
|
||||||
|
source->ip, source->port,
|
||||||
|
target->ip, target->port);
|
||||||
|
listIter dli;
|
||||||
|
listNode *dln;
|
||||||
|
listRewind(diffs, &dli);
|
||||||
|
while((dln = listNext(&dli)) != NULL) {
|
||||||
|
char *k = dln->value;
|
||||||
|
clusterManagerLogErr(" - %s\n", k);
|
||||||
|
}
|
||||||
|
clusterManagerLogErr("Please fix the above key(s) "
|
||||||
|
"manually and try again "
|
||||||
|
"or relaunch the command \n"
|
||||||
|
"with --cluster-replace "
|
||||||
|
"option to force key "
|
||||||
|
"overriding.\n");
|
||||||
|
listRelease(diffs);
|
||||||
|
goto next;
|
||||||
|
}
|
||||||
|
listRelease(diffs);
|
||||||
|
}
|
||||||
|
clusterManagerLogWarn("*** Replacing target keys...\n");
|
||||||
|
}
|
||||||
freeReplyObject(migrate_reply);
|
freeReplyObject(migrate_reply);
|
||||||
migrate_reply = clusterManagerMigrateKeysInReply(source,
|
migrate_reply = clusterManagerMigrateKeysInReply(source,
|
||||||
target,
|
target,
|
||||||
|
@ -3610,24 +3946,17 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
|
||||||
listRewind(none, &li);
|
listRewind(none, &li);
|
||||||
while ((ln = listNext(&li)) != NULL) {
|
while ((ln = listNext(&li)) != NULL) {
|
||||||
sds slot = ln->value;
|
sds slot = ln->value;
|
||||||
|
int s = atoi(slot);
|
||||||
clusterManagerNode *n = clusterManagerNodeMasterRandom();
|
clusterManagerNode *n = clusterManagerNodeMasterRandom();
|
||||||
clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n",
|
clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n",
|
||||||
slot, n->ip, n->port);
|
slot, n->ip, n->port);
|
||||||
/* Ensure the slot is not already assigned. */
|
if (!clusterManagerSetSlotOwner(n, s, 0)) {
|
||||||
redisReply *r = CLUSTER_MANAGER_COMMAND(n,
|
fixed = -1;
|
||||||
"CLUSTER DELSLOTS %s", slot);
|
goto cleanup;
|
||||||
if (r) freeReplyObject(r);
|
}
|
||||||
r = CLUSTER_MANAGER_COMMAND(n,
|
|
||||||
"CLUSTER ADDSLOTS %s", slot);
|
|
||||||
if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
|
|
||||||
if (r) freeReplyObject(r);
|
|
||||||
r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER BUMPEPOCH");
|
|
||||||
if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
|
|
||||||
if (r) freeReplyObject(r);
|
|
||||||
if (fixed < 0) goto cleanup;
|
|
||||||
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
|
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
|
||||||
* info into the node struct, in order to keep it synced */
|
* info into the node struct, in order to keep it synced */
|
||||||
n->slots[atoi(slot)] = 1;
|
n->slots[s] = 1;
|
||||||
fixed++;
|
fixed++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3635,7 +3964,7 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
|
||||||
|
|
||||||
/* Handle case "2": keys only in one node. */
|
/* Handle case "2": keys only in one node. */
|
||||||
if (listLength(single) > 0) {
|
if (listLength(single) > 0) {
|
||||||
printf("The following uncovered slots have keys in just one node:\n");
|
printf("The following uncovered slots have keys in just one node:\n");
|
||||||
clusterManagerPrintSlotsList(single);
|
clusterManagerPrintSlotsList(single);
|
||||||
if (confirmWithYes("Fix these slots by covering with those nodes?")){
|
if (confirmWithYes("Fix these slots by covering with those nodes?")){
|
||||||
listIter li;
|
listIter li;
|
||||||
|
@ -3643,6 +3972,7 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
|
||||||
listRewind(single, &li);
|
listRewind(single, &li);
|
||||||
while ((ln = listNext(&li)) != NULL) {
|
while ((ln = listNext(&li)) != NULL) {
|
||||||
sds slot = ln->value;
|
sds slot = ln->value;
|
||||||
|
int s = atoi(slot);
|
||||||
dictEntry *entry = dictFind(clusterManagerUncoveredSlots, slot);
|
dictEntry *entry = dictFind(clusterManagerUncoveredSlots, slot);
|
||||||
assert(entry != NULL);
|
assert(entry != NULL);
|
||||||
list *nodes = (list *) dictGetVal(entry);
|
list *nodes = (list *) dictGetVal(entry);
|
||||||
|
@ -3651,18 +3981,10 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
|
||||||
clusterManagerNode *n = fn->value;
|
clusterManagerNode *n = fn->value;
|
||||||
clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n",
|
clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n",
|
||||||
slot, n->ip, n->port);
|
slot, n->ip, n->port);
|
||||||
/* Ensure the slot is not already assigned. */
|
if (!clusterManagerSetSlotOwner(n, s, 0)) {
|
||||||
redisReply *r = CLUSTER_MANAGER_COMMAND(n,
|
fixed = -1;
|
||||||
"CLUSTER DELSLOTS %s", slot);
|
goto cleanup;
|
||||||
if (r) freeReplyObject(r);
|
}
|
||||||
r = CLUSTER_MANAGER_COMMAND(n,
|
|
||||||
"CLUSTER ADDSLOTS %s", slot);
|
|
||||||
if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
|
|
||||||
if (r) freeReplyObject(r);
|
|
||||||
r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER BUMPEPOCH");
|
|
||||||
if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
|
|
||||||
if (r) freeReplyObject(r);
|
|
||||||
if (fixed < 0) goto cleanup;
|
|
||||||
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
|
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
|
||||||
* info into the node struct, in order to keep it synced */
|
* info into the node struct, in order to keep it synced */
|
||||||
n->slots[atoi(slot)] = 1;
|
n->slots[atoi(slot)] = 1;
|
||||||
|
@ -3695,23 +4017,10 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
|
||||||
clusterManagerLogInfo(">>> Covering slot %s moving keys "
|
clusterManagerLogInfo(">>> Covering slot %s moving keys "
|
||||||
"to %s:%d\n", slot,
|
"to %s:%d\n", slot,
|
||||||
target->ip, target->port);
|
target->ip, target->port);
|
||||||
/* Ensure the slot is not already assigned. */
|
if (!clusterManagerSetSlotOwner(target, s, 1)) {
|
||||||
redisReply *r = CLUSTER_MANAGER_COMMAND(target,
|
fixed = -1;
|
||||||
"CLUSTER DELSLOTS %s", slot);
|
goto cleanup;
|
||||||
if (r) freeReplyObject(r);
|
}
|
||||||
r = CLUSTER_MANAGER_COMMAND(target,
|
|
||||||
"CLUSTER ADDSLOTS %s", slot);
|
|
||||||
if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1;
|
|
||||||
if (r) freeReplyObject(r);
|
|
||||||
if (fixed < 0) goto cleanup;
|
|
||||||
r = CLUSTER_MANAGER_COMMAND(target,
|
|
||||||
"CLUSTER SETSLOT %s %s", slot, "STABLE");
|
|
||||||
if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1;
|
|
||||||
if (r) freeReplyObject(r);
|
|
||||||
r = CLUSTER_MANAGER_COMMAND(target, "CLUSTER BUMPEPOCH");
|
|
||||||
if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1;
|
|
||||||
if (r) freeReplyObject(r);
|
|
||||||
if (fixed < 0) goto cleanup;
|
|
||||||
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
|
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
|
||||||
* info into the node struct, in order to keep it synced */
|
* info into the node struct, in order to keep it synced */
|
||||||
target->slots[atoi(slot)] = 1;
|
target->slots[atoi(slot)] = 1;
|
||||||
|
@ -3722,23 +4031,15 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
|
||||||
clusterManagerNode *src = nln->value;
|
clusterManagerNode *src = nln->value;
|
||||||
if (src == target) continue;
|
if (src == target) continue;
|
||||||
/* Assign the slot to target node in the source node. */
|
/* Assign the slot to target node in the source node. */
|
||||||
redisReply *r = CLUSTER_MANAGER_COMMAND(src,
|
if (!clusterManagerSetSlot(src, target, s, "NODE", NULL))
|
||||||
"CLUSTER SETSLOT %s %s %s", slot,
|
|
||||||
"NODE", target->name);
|
|
||||||
if (!clusterManagerCheckRedisReply(src, r, NULL))
|
|
||||||
fixed = -1;
|
fixed = -1;
|
||||||
if (r) freeReplyObject(r);
|
|
||||||
if (fixed < 0) goto cleanup;
|
if (fixed < 0) goto cleanup;
|
||||||
/* Set the source node in 'importing' state
|
/* Set the source node in 'importing' state
|
||||||
* (even if we will actually migrate keys away)
|
* (even if we will actually migrate keys away)
|
||||||
* in order to avoid receiving redirections
|
* in order to avoid receiving redirections
|
||||||
* for MIGRATE. */
|
* for MIGRATE. */
|
||||||
r = CLUSTER_MANAGER_COMMAND(src,
|
if (!clusterManagerSetSlot(src, target, s,
|
||||||
"CLUSTER SETSLOT %s %s %s", slot,
|
"IMPORTING", NULL)) fixed = -1;
|
||||||
"IMPORTING", target->name);
|
|
||||||
if (!clusterManagerCheckRedisReply(src, r, NULL))
|
|
||||||
fixed = -1;
|
|
||||||
if (r) freeReplyObject(r);
|
|
||||||
if (fixed < 0) goto cleanup;
|
if (fixed < 0) goto cleanup;
|
||||||
int opts = CLUSTER_MANAGER_OPT_VERBOSE |
|
int opts = CLUSTER_MANAGER_OPT_VERBOSE |
|
||||||
CLUSTER_MANAGER_OPT_COLD;
|
CLUSTER_MANAGER_OPT_COLD;
|
||||||
|
@ -3746,12 +4047,8 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
|
||||||
fixed = -1;
|
fixed = -1;
|
||||||
goto cleanup;
|
goto cleanup;
|
||||||
}
|
}
|
||||||
r = CLUSTER_MANAGER_COMMAND(src,
|
if (!clusterManagerClearSlotStatus(src, s))
|
||||||
"CLUSTER SETSLOT %s %s", slot,
|
|
||||||
"STABLE");
|
|
||||||
if (!clusterManagerCheckRedisReply(src, r, NULL))
|
|
||||||
fixed = -1;
|
fixed = -1;
|
||||||
if (r) freeReplyObject(r);
|
|
||||||
if (fixed < 0) goto cleanup;
|
if (fixed < 0) goto cleanup;
|
||||||
}
|
}
|
||||||
fixed++;
|
fixed++;
|
||||||
|
@ -3875,24 +4172,9 @@ static int clusterManagerFixOpenSlot(int slot) {
|
||||||
// Use ADDSLOTS to assign the slot.
|
// Use ADDSLOTS to assign the slot.
|
||||||
clusterManagerLogWarn("*** Configuring %s:%d as the slot owner\n",
|
clusterManagerLogWarn("*** Configuring %s:%d as the slot owner\n",
|
||||||
owner->ip, owner->port);
|
owner->ip, owner->port);
|
||||||
redisReply *reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER "
|
success = clusterManagerClearSlotStatus(owner, slot);
|
||||||
"SETSLOT %d %s",
|
|
||||||
slot, "STABLE");
|
|
||||||
success = clusterManagerCheckRedisReply(owner, reply, NULL);
|
|
||||||
if (reply) freeReplyObject(reply);
|
|
||||||
if (!success) goto cleanup;
|
if (!success) goto cleanup;
|
||||||
/* Ensure that the slot is unassigned before assigning it to the
|
success = clusterManagerSetSlotOwner(owner, slot, 0);
|
||||||
* owner. */
|
|
||||||
reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER DELSLOTS %d", slot);
|
|
||||||
success = clusterManagerCheckRedisReply(owner, reply, NULL);
|
|
||||||
/* Ignore "already unassigned" error. */
|
|
||||||
if (!success && reply && reply->type == REDIS_REPLY_ERROR &&
|
|
||||||
strstr(reply->str, "already unassigned") != NULL) success = 1;
|
|
||||||
if (reply) freeReplyObject(reply);
|
|
||||||
if (!success) goto cleanup;
|
|
||||||
reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER ADDSLOTS %d", slot);
|
|
||||||
success = clusterManagerCheckRedisReply(owner, reply, NULL);
|
|
||||||
if (reply) freeReplyObject(reply);
|
|
||||||
if (!success) goto cleanup;
|
if (!success) goto cleanup;
|
||||||
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
|
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
|
||||||
* info into the node struct, in order to keep it synced */
|
* info into the node struct, in order to keep it synced */
|
||||||
|
@ -3900,9 +4182,7 @@ static int clusterManagerFixOpenSlot(int slot) {
|
||||||
/* Make sure this information will propagate. Not strictly needed
|
/* Make sure this information will propagate. Not strictly needed
|
||||||
* since there is no past owner, so all the other nodes will accept
|
* since there is no past owner, so all the other nodes will accept
|
||||||
* whatever epoch this node will claim the slot with. */
|
* whatever epoch this node will claim the slot with. */
|
||||||
reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER BUMPEPOCH");
|
success = clusterManagerBumpEpoch(owner);
|
||||||
success = clusterManagerCheckRedisReply(owner, reply, NULL);
|
|
||||||
if (reply) freeReplyObject(reply);
|
|
||||||
if (!success) goto cleanup;
|
if (!success) goto cleanup;
|
||||||
/* Remove the owner from the list of migrating/importing
|
/* Remove the owner from the list of migrating/importing
|
||||||
* nodes. */
|
* nodes. */
|
||||||
|
@ -3922,16 +4202,10 @@ static int clusterManagerFixOpenSlot(int slot) {
|
||||||
* the owner has been set in the previous condition (owner == NULL). */
|
* the owner has been set in the previous condition (owner == NULL). */
|
||||||
assert(owner != NULL);
|
assert(owner != NULL);
|
||||||
listRewind(owners, &li);
|
listRewind(owners, &li);
|
||||||
redisReply *reply = NULL;
|
|
||||||
while ((ln = listNext(&li)) != NULL) {
|
while ((ln = listNext(&li)) != NULL) {
|
||||||
clusterManagerNode *n = ln->value;
|
clusterManagerNode *n = ln->value;
|
||||||
if (n == owner) continue;
|
if (n == owner) continue;
|
||||||
reply = CLUSTER_MANAGER_COMMAND(n, "CLUSTER DELSLOTS %d", slot);
|
success = clusterManagerDelSlot(n, slot, 1);
|
||||||
success = clusterManagerCheckRedisReply(n, reply, NULL);
|
|
||||||
/* Ignore "already unassigned" error. */
|
|
||||||
if (!success && reply && reply->type == REDIS_REPLY_ERROR &&
|
|
||||||
strstr(reply->str, "already unassigned") != NULL) success = 1;
|
|
||||||
if (reply) freeReplyObject(reply);
|
|
||||||
if (!success) goto cleanup;
|
if (!success) goto cleanup;
|
||||||
n->slots[slot] = 0;
|
n->slots[slot] = 0;
|
||||||
/* Assign the slot to the owner in the node 'n' configuration.' */
|
/* Assign the slot to the owner in the node 'n' configuration.' */
|
||||||
|
@ -3955,6 +4229,7 @@ static int clusterManagerFixOpenSlot(int slot) {
|
||||||
clusterManagerLogInfo(">>> Case 1: Moving slot %d from "
|
clusterManagerLogInfo(">>> Case 1: Moving slot %d from "
|
||||||
"%s:%d to %s:%d\n", slot,
|
"%s:%d to %s:%d\n", slot,
|
||||||
src->ip, src->port, dst->ip, dst->port);
|
src->ip, src->port, dst->ip, dst->port);
|
||||||
|
move_opts |= CLUSTER_MANAGER_OPT_UPDATE;
|
||||||
success = clusterManagerMoveSlot(src, dst, slot, move_opts, NULL);
|
success = clusterManagerMoveSlot(src, dst, slot, move_opts, NULL);
|
||||||
}
|
}
|
||||||
/* Case 2: There are multiple nodes that claim the slot as importing,
|
/* Case 2: There are multiple nodes that claim the slot as importing,
|
||||||
|
@ -3973,11 +4248,7 @@ static int clusterManagerFixOpenSlot(int slot) {
|
||||||
if (!success) goto cleanup;
|
if (!success) goto cleanup;
|
||||||
clusterManagerLogInfo(">>> Setting %d as STABLE in "
|
clusterManagerLogInfo(">>> Setting %d as STABLE in "
|
||||||
"%s:%d\n", slot, n->ip, n->port);
|
"%s:%d\n", slot, n->ip, n->port);
|
||||||
|
success = clusterManagerClearSlotStatus(n, slot);
|
||||||
redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SETSLOT %d %s",
|
|
||||||
slot, "STABLE");
|
|
||||||
success = clusterManagerCheckRedisReply(n, r, NULL);
|
|
||||||
if (r) freeReplyObject(r);
|
|
||||||
if (!success) goto cleanup;
|
if (!success) goto cleanup;
|
||||||
}
|
}
|
||||||
/* Since the slot has been moved in "cold" mode, ensure that all the
|
/* Since the slot has been moved in "cold" mode, ensure that all the
|
||||||
|
@ -3987,12 +4258,76 @@ static int clusterManagerFixOpenSlot(int slot) {
|
||||||
clusterManagerNode *n = ln->value;
|
clusterManagerNode *n = ln->value;
|
||||||
if (n == owner) continue;
|
if (n == owner) continue;
|
||||||
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
|
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
|
||||||
redisReply *r = CLUSTER_MANAGER_COMMAND(n,
|
success = clusterManagerSetSlot(n, owner, slot, "NODE", NULL);
|
||||||
"CLUSTER SETSLOT %d %s %s", slot, "NODE", owner->name);
|
|
||||||
success = clusterManagerCheckRedisReply(n, r, NULL);
|
|
||||||
if (r) freeReplyObject(r);
|
|
||||||
if (!success) goto cleanup;
|
if (!success) goto cleanup;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
/* Case 3: The slot is in migrating state in one node but multiple
|
||||||
|
* other nodes claim to be in importing state and don't have any key in
|
||||||
|
* the slot. We search for the importing node having the same ID as
|
||||||
|
* the destination node of the migrating node.
|
||||||
|
* In that case we move the slot from the migrating node to this node and
|
||||||
|
* we close the importing states on all the other importing nodes.
|
||||||
|
* If no importing node has the same ID as the destination node of the
|
||||||
|
* migrating node, the slot's state is closed on both the migrating node
|
||||||
|
* and the importing nodes. */
|
||||||
|
else if (listLength(migrating) == 1 && listLength(importing) > 1) {
|
||||||
|
int try_to_fix = 1;
|
||||||
|
clusterManagerNode *src = listFirst(migrating)->value;
|
||||||
|
clusterManagerNode *dst = NULL;
|
||||||
|
sds target_id = NULL;
|
||||||
|
for (int i = 0; i < src->migrating_count; i += 2) {
|
||||||
|
sds migrating_slot = src->migrating[i];
|
||||||
|
if (atoi(migrating_slot) == slot) {
|
||||||
|
target_id = src->migrating[i + 1];
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assert(target_id != NULL);
|
||||||
|
listIter li;
|
||||||
|
listNode *ln;
|
||||||
|
listRewind(importing, &li);
|
||||||
|
while ((ln = listNext(&li)) != NULL) {
|
||||||
|
clusterManagerNode *n = ln->value;
|
||||||
|
int count = clusterManagerCountKeysInSlot(n, slot);
|
||||||
|
if (count > 0) {
|
||||||
|
try_to_fix = 0;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (strcmp(n->name, target_id) == 0) dst = n;
|
||||||
|
}
|
||||||
|
if (!try_to_fix) goto unhandled_case;
|
||||||
|
if (dst != NULL) {
|
||||||
|
clusterManagerLogInfo(">>> Case 3: Moving slot %d from %s:%d to "
|
||||||
|
"%s:%d and closing it on all the other "
|
||||||
|
"importing nodes.\n",
|
||||||
|
slot, src->ip, src->port,
|
||||||
|
dst->ip, dst->port);
|
||||||
|
/* Move the slot to the destination node. */
|
||||||
|
success = clusterManagerMoveSlot(src, dst, slot, move_opts, NULL);
|
||||||
|
if (!success) goto cleanup;
|
||||||
|
/* Close slot on all the other importing nodes. */
|
||||||
|
listRewind(importing, &li);
|
||||||
|
while ((ln = listNext(&li)) != NULL) {
|
||||||
|
clusterManagerNode *n = ln->value;
|
||||||
|
if (dst == n) continue;
|
||||||
|
success = clusterManagerClearSlotStatus(n, slot);
|
||||||
|
if (!success) goto cleanup;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
clusterManagerLogInfo(">>> Case 3: Closing slot %d on both "
|
||||||
|
"migrating and importing nodes.\n", slot);
|
||||||
|
/* Close the slot on both the migrating node and the importing
|
||||||
|
* nodes. */
|
||||||
|
success = clusterManagerClearSlotStatus(src, slot);
|
||||||
|
if (!success) goto cleanup;
|
||||||
|
listRewind(importing, &li);
|
||||||
|
while ((ln = listNext(&li)) != NULL) {
|
||||||
|
clusterManagerNode *n = ln->value;
|
||||||
|
success = clusterManagerClearSlotStatus(n, slot);
|
||||||
|
if (!success) goto cleanup;
|
||||||
|
}
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
int try_to_close_slot = (listLength(importing) == 0 &&
|
int try_to_close_slot = (listLength(importing) == 0 &&
|
||||||
listLength(migrating) == 1);
|
listLength(migrating) == 1);
|
||||||
|
@ -4009,13 +4344,13 @@ static int clusterManagerFixOpenSlot(int slot) {
|
||||||
if (!success) goto cleanup;
|
if (!success) goto cleanup;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
/* Case 3: There are no slots claiming to be in importing state, but
|
/* Case 4: There are no slots claiming to be in importing state, but
|
||||||
* there is a migrating node that actually don't have any key or is the
|
* there is a migrating node that actually don't have any key or is the
|
||||||
* slot owner. We can just close the slot, probably a reshard interrupted
|
* slot owner. We can just close the slot, probably a reshard
|
||||||
* in the middle. */
|
* interrupted in the middle. */
|
||||||
if (try_to_close_slot) {
|
if (try_to_close_slot) {
|
||||||
clusterManagerNode *n = listFirst(migrating)->value;
|
clusterManagerNode *n = listFirst(migrating)->value;
|
||||||
clusterManagerLogInfo(">>> Case 3: Closing slot %d on %s:%d\n",
|
clusterManagerLogInfo(">>> Case 4: Closing slot %d on %s:%d\n",
|
||||||
slot, n->ip, n->port);
|
slot, n->ip, n->port);
|
||||||
redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SETSLOT %d %s",
|
redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SETSLOT %d %s",
|
||||||
slot, "STABLE");
|
slot, "STABLE");
|
||||||
|
@ -4023,6 +4358,7 @@ static int clusterManagerFixOpenSlot(int slot) {
|
||||||
if (r) freeReplyObject(r);
|
if (r) freeReplyObject(r);
|
||||||
if (!success) goto cleanup;
|
if (!success) goto cleanup;
|
||||||
} else {
|
} else {
|
||||||
|
unhandled_case:
|
||||||
success = 0;
|
success = 0;
|
||||||
clusterManagerLogErr("[ERR] Sorry, redis-cli can't fix this slot "
|
clusterManagerLogErr("[ERR] Sorry, redis-cli can't fix this slot "
|
||||||
"yet (work in progress). Slot is set as "
|
"yet (work in progress). Slot is set as "
|
||||||
|
@ -4040,17 +4376,55 @@ cleanup:
|
||||||
return success;
|
return success;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int clusterManagerFixMultipleSlotOwners(int slot, list *owners) {
|
||||||
|
clusterManagerLogInfo(">>> Fixing multiple owners for slot %d...\n", slot);
|
||||||
|
int success = 0;
|
||||||
|
assert(listLength(owners) > 1);
|
||||||
|
clusterManagerNode *owner = clusterManagerGetNodeWithMostKeysInSlot(owners,
|
||||||
|
slot,
|
||||||
|
NULL);
|
||||||
|
if (!owner) owner = listFirst(owners)->value;
|
||||||
|
clusterManagerLogInfo(">>> Setting slot %d owner: %s:%d\n",
|
||||||
|
slot, owner->ip, owner->port);
|
||||||
|
/* Set the slot owner. */
|
||||||
|
if (!clusterManagerSetSlotOwner(owner, slot, 0)) return 0;
|
||||||
|
listIter li;
|
||||||
|
listNode *ln;
|
||||||
|
listRewind(cluster_manager.nodes, &li);
|
||||||
|
/* Update configuration in all the other master nodes by assigning the slot
|
||||||
|
* itself to the new owner, and by eventually migrating keys if the node
|
||||||
|
* has keys for the slot. */
|
||||||
|
while ((ln = listNext(&li)) != NULL) {
|
||||||
|
clusterManagerNode *n = ln->value;
|
||||||
|
if (n == owner) continue;
|
||||||
|
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
|
||||||
|
int count = clusterManagerCountKeysInSlot(n, slot);
|
||||||
|
success = (count >= 0);
|
||||||
|
if (!success) break;
|
||||||
|
clusterManagerDelSlot(n, slot, 1);
|
||||||
|
if (!clusterManagerSetSlot(n, owner, slot, "node", NULL)) return 0;
|
||||||
|
if (count > 0) {
|
||||||
|
int opts = CLUSTER_MANAGER_OPT_VERBOSE |
|
||||||
|
CLUSTER_MANAGER_OPT_COLD;
|
||||||
|
success = clusterManagerMoveSlot(n, owner, slot, opts, NULL);
|
||||||
|
if (!success) break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return success;
|
||||||
|
}
|
||||||
|
|
||||||
static int clusterManagerCheckCluster(int quiet) {
|
static int clusterManagerCheckCluster(int quiet) {
|
||||||
listNode *ln = listFirst(cluster_manager.nodes);
|
listNode *ln = listFirst(cluster_manager.nodes);
|
||||||
if (!ln) return 0;
|
if (!ln) return 0;
|
||||||
int result = 1;
|
|
||||||
int do_fix = config.cluster_manager_command.flags &
|
|
||||||
CLUSTER_MANAGER_CMD_FLAG_FIX;
|
|
||||||
clusterManagerNode *node = ln->value;
|
clusterManagerNode *node = ln->value;
|
||||||
clusterManagerLogInfo(">>> Performing Cluster Check (using node %s:%d)\n",
|
clusterManagerLogInfo(">>> Performing Cluster Check (using node %s:%d)\n",
|
||||||
node->ip, node->port);
|
node->ip, node->port);
|
||||||
|
int result = 1, consistent = 0;
|
||||||
|
int do_fix = config.cluster_manager_command.flags &
|
||||||
|
CLUSTER_MANAGER_CMD_FLAG_FIX;
|
||||||
if (!quiet) clusterManagerShowNodes();
|
if (!quiet) clusterManagerShowNodes();
|
||||||
if (!clusterManagerIsConfigConsistent()) {
|
consistent = clusterManagerIsConfigConsistent();
|
||||||
|
if (!consistent) {
|
||||||
sds err = sdsnew("[ERR] Nodes don't agree about configuration!");
|
sds err = sdsnew("[ERR] Nodes don't agree about configuration!");
|
||||||
clusterManagerOnError(err);
|
clusterManagerOnError(err);
|
||||||
result = 0;
|
result = 0;
|
||||||
|
@ -4058,7 +4432,7 @@ static int clusterManagerCheckCluster(int quiet) {
|
||||||
clusterManagerLogOk("[OK] All nodes agree about slots "
|
clusterManagerLogOk("[OK] All nodes agree about slots "
|
||||||
"configuration.\n");
|
"configuration.\n");
|
||||||
}
|
}
|
||||||
// Check open slots
|
/* Check open slots */
|
||||||
clusterManagerLogInfo(">>> Check for open slots...\n");
|
clusterManagerLogInfo(">>> Check for open slots...\n");
|
||||||
listIter li;
|
listIter li;
|
||||||
listRewind(cluster_manager.nodes, &li);
|
listRewind(cluster_manager.nodes, &li);
|
||||||
|
@ -4077,7 +4451,7 @@ static int clusterManagerCheckCluster(int quiet) {
|
||||||
n->port);
|
n->port);
|
||||||
for (i = 0; i < n->migrating_count; i += 2) {
|
for (i = 0; i < n->migrating_count; i += 2) {
|
||||||
sds slot = n->migrating[i];
|
sds slot = n->migrating[i];
|
||||||
dictAdd(open_slots, slot, sdsdup(n->migrating[i + 1]));
|
dictReplace(open_slots, slot, sdsdup(n->migrating[i + 1]));
|
||||||
char *fmt = (i > 0 ? ",%S" : "%S");
|
char *fmt = (i > 0 ? ",%S" : "%S");
|
||||||
errstr = sdscatfmt(errstr, fmt, slot);
|
errstr = sdscatfmt(errstr, fmt, slot);
|
||||||
}
|
}
|
||||||
|
@ -4095,7 +4469,7 @@ static int clusterManagerCheckCluster(int quiet) {
|
||||||
n->port);
|
n->port);
|
||||||
for (i = 0; i < n->importing_count; i += 2) {
|
for (i = 0; i < n->importing_count; i += 2) {
|
||||||
sds slot = n->importing[i];
|
sds slot = n->importing[i];
|
||||||
dictAdd(open_slots, slot, sdsdup(n->importing[i + 1]));
|
dictReplace(open_slots, slot, sdsdup(n->importing[i + 1]));
|
||||||
char *fmt = (i > 0 ? ",%S" : "%S");
|
char *fmt = (i > 0 ? ",%S" : "%S");
|
||||||
errstr = sdscatfmt(errstr, fmt, slot);
|
errstr = sdscatfmt(errstr, fmt, slot);
|
||||||
}
|
}
|
||||||
|
@ -4117,7 +4491,7 @@ static int clusterManagerCheckCluster(int quiet) {
|
||||||
clusterManagerLogErr("%s.\n", (char *) errstr);
|
clusterManagerLogErr("%s.\n", (char *) errstr);
|
||||||
sdsfree(errstr);
|
sdsfree(errstr);
|
||||||
if (do_fix) {
|
if (do_fix) {
|
||||||
// Fix open slots.
|
/* Fix open slots. */
|
||||||
dictReleaseIterator(iter);
|
dictReleaseIterator(iter);
|
||||||
iter = dictGetIterator(open_slots);
|
iter = dictGetIterator(open_slots);
|
||||||
while ((entry = dictNext(iter)) != NULL) {
|
while ((entry = dictNext(iter)) != NULL) {
|
||||||
|
@ -4152,6 +4526,54 @@ static int clusterManagerCheckCluster(int quiet) {
|
||||||
if (fixed > 0) result = 1;
|
if (fixed > 0) result = 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
int search_multiple_owners = config.cluster_manager_command.flags &
|
||||||
|
CLUSTER_MANAGER_CMD_FLAG_CHECK_OWNERS;
|
||||||
|
if (search_multiple_owners) {
|
||||||
|
/* Check whether there are multiple owners, even when slots are
|
||||||
|
* fully covered and there are no open slots. */
|
||||||
|
clusterManagerLogInfo(">>> Check for multiple slot owners...\n");
|
||||||
|
int slot = 0, slots_with_multiple_owners = 0;
|
||||||
|
for (; slot < CLUSTER_MANAGER_SLOTS; slot++) {
|
||||||
|
listIter li;
|
||||||
|
listNode *ln;
|
||||||
|
listRewind(cluster_manager.nodes, &li);
|
||||||
|
list *owners = listCreate();
|
||||||
|
while ((ln = listNext(&li)) != NULL) {
|
||||||
|
clusterManagerNode *n = ln->value;
|
||||||
|
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
|
||||||
|
if (n->slots[slot]) listAddNodeTail(owners, n);
|
||||||
|
else {
|
||||||
|
/* Nodes having keys for the slot will be considered
|
||||||
|
* owners too. */
|
||||||
|
int count = clusterManagerCountKeysInSlot(n, slot);
|
||||||
|
if (count > 0) listAddNodeTail(owners, n);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (listLength(owners) > 1) {
|
||||||
|
result = 0;
|
||||||
|
clusterManagerLogErr("[WARNING] Slot %d has %d owners:\n",
|
||||||
|
slot, listLength(owners));
|
||||||
|
listRewind(owners, &li);
|
||||||
|
while ((ln = listNext(&li)) != NULL) {
|
||||||
|
clusterManagerNode *n = ln->value;
|
||||||
|
clusterManagerLogErr(" %s:%d\n", n->ip, n->port);
|
||||||
|
}
|
||||||
|
slots_with_multiple_owners++;
|
||||||
|
if (do_fix) {
|
||||||
|
result = clusterManagerFixMultipleSlotOwners(slot, owners);
|
||||||
|
if (!result) {
|
||||||
|
clusterManagerLogErr("Failed to fix multiple owners "
|
||||||
|
"for slot %d\n", slot);
|
||||||
|
listRelease(owners);
|
||||||
|
break;
|
||||||
|
} else slots_with_multiple_owners--;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
listRelease(owners);
|
||||||
|
}
|
||||||
|
if (slots_with_multiple_owners == 0)
|
||||||
|
clusterManagerLogOk("[OK] No multiple owners found.\n");
|
||||||
|
}
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5404,7 +5826,7 @@ static int clusterManagerCommandCall(int argc, char **argv) {
|
||||||
if (status != REDIS_OK || reply == NULL )
|
if (status != REDIS_OK || reply == NULL )
|
||||||
printf("%s:%d: Failed!\n", n->ip, n->port);
|
printf("%s:%d: Failed!\n", n->ip, n->port);
|
||||||
else {
|
else {
|
||||||
sds formatted_reply = cliFormatReplyTTY(reply, "");
|
sds formatted_reply = cliFormatReplyRaw(reply);
|
||||||
printf("%s:%d: %s\n", n->ip, n->port, (char *) formatted_reply);
|
printf("%s:%d: %s\n", n->ip, n->port, (char *) formatted_reply);
|
||||||
sdsfree(formatted_reply);
|
sdsfree(formatted_reply);
|
||||||
}
|
}
|
||||||
|
@ -6781,6 +7203,8 @@ int main(int argc, char **argv) {
|
||||||
argc -= firstarg;
|
argc -= firstarg;
|
||||||
argv += firstarg;
|
argv += firstarg;
|
||||||
|
|
||||||
|
parseEnv();
|
||||||
|
|
||||||
/* Cluster Manager mode */
|
/* Cluster Manager mode */
|
||||||
if (CLUSTER_MANAGER_MODE()) {
|
if (CLUSTER_MANAGER_MODE()) {
|
||||||
clusterManagerCommandProc *proc = validateClusterManagerCommand();
|
clusterManagerCommandProc *proc = validateClusterManagerCommand();
|
||||||
|
|
|
@ -695,7 +695,7 @@ sds sdscatfmt(sds s, char const *fmt, ...) {
|
||||||
* s = sdstrim(s,"Aa. :");
|
* s = sdstrim(s,"Aa. :");
|
||||||
* printf("%s\n", s);
|
* printf("%s\n", s);
|
||||||
*
|
*
|
||||||
* Output will be just "Hello World".
|
* Output will be just "HelloWorld".
|
||||||
*/
|
*/
|
||||||
sds sdstrim(sds s, const char *cset) {
|
sds sdstrim(sds s, const char *cset) {
|
||||||
char *start, *end, *sp, *ep;
|
char *start, *end, *sp, *ep;
|
||||||
|
|
16
src/server.c
16
src/server.c
|
@ -2607,17 +2607,13 @@ int processCommand(client *c) {
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Handle the maxmemory directive.
|
/* Handle the maxmemory directive.
|
||||||
*
|
|
||||||
* First we try to free some memory if possible (if there are volatile
|
|
||||||
* keys in the dataset). If there are not the only thing we can do
|
|
||||||
* is returning an error.
|
|
||||||
*
|
*
|
||||||
* Note that we do not want to reclaim memory if we are here re-entering
|
* Note that we do not want to reclaim memory if we are here re-entering
|
||||||
* the event loop since there is a busy Lua script running in timeout
|
* the event loop since there is a busy Lua script running in timeout
|
||||||
* condition, to avoid mixing the propagation of scripts with the propagation
|
* condition, to avoid mixing the propagation of scripts with the
|
||||||
* of DELs due to eviction. */
|
* propagation of DELs due to eviction. */
|
||||||
if (server.maxmemory && !server.lua_timedout) {
|
if (server.maxmemory && !server.lua_timedout) {
|
||||||
int out_of_memory = freeMemoryIfNeeded() == C_ERR;
|
int out_of_memory = freeMemoryIfNeededAndSafe() == C_ERR;
|
||||||
/* freeMemoryIfNeeded may flush slave output buffers. This may result
|
/* freeMemoryIfNeeded may flush slave output buffers. This may result
|
||||||
* into a slave, that may be the active client, to be freed. */
|
* into a slave, that may be the active client, to be freed. */
|
||||||
if (server.current_client == NULL) return C_ERR;
|
if (server.current_client == NULL) return C_ERR;
|
||||||
|
@ -3247,11 +3243,11 @@ sds genRedisInfoString(char *section) {
|
||||||
"allocator_frag_ratio:%.2f\r\n"
|
"allocator_frag_ratio:%.2f\r\n"
|
||||||
"allocator_frag_bytes:%zu\r\n"
|
"allocator_frag_bytes:%zu\r\n"
|
||||||
"allocator_rss_ratio:%.2f\r\n"
|
"allocator_rss_ratio:%.2f\r\n"
|
||||||
"allocator_rss_bytes:%zu\r\n"
|
"allocator_rss_bytes:%zd\r\n"
|
||||||
"rss_overhead_ratio:%.2f\r\n"
|
"rss_overhead_ratio:%.2f\r\n"
|
||||||
"rss_overhead_bytes:%zu\r\n"
|
"rss_overhead_bytes:%zd\r\n"
|
||||||
"mem_fragmentation_ratio:%.2f\r\n"
|
"mem_fragmentation_ratio:%.2f\r\n"
|
||||||
"mem_fragmentation_bytes:%zu\r\n"
|
"mem_fragmentation_bytes:%zd\r\n"
|
||||||
"mem_not_counted_for_evict:%zu\r\n"
|
"mem_not_counted_for_evict:%zu\r\n"
|
||||||
"mem_replication_backlog:%zu\r\n"
|
"mem_replication_backlog:%zu\r\n"
|
||||||
"mem_clients_slaves:%zu\r\n"
|
"mem_clients_slaves:%zu\r\n"
|
||||||
|
|
10
src/server.h
10
src/server.h
|
@ -654,6 +654,9 @@ typedef struct multiCmd {
|
||||||
typedef struct multiState {
|
typedef struct multiState {
|
||||||
multiCmd *commands; /* Array of MULTI commands */
|
multiCmd *commands; /* Array of MULTI commands */
|
||||||
int count; /* Total number of MULTI commands */
|
int count; /* Total number of MULTI commands */
|
||||||
|
int cmd_flags; /* The accumulated command flags OR-ed together.
|
||||||
|
So if at least a command has a given flag, it
|
||||||
|
will be set in this field. */
|
||||||
int minreplicas; /* MINREPLICAS for synchronous replication */
|
int minreplicas; /* MINREPLICAS for synchronous replication */
|
||||||
time_t minreplicas_timeout; /* MINREPLICAS timeout as unixtime. */
|
time_t minreplicas_timeout; /* MINREPLICAS timeout as unixtime. */
|
||||||
} multiState;
|
} multiState;
|
||||||
|
@ -864,11 +867,11 @@ struct redisMemOverhead {
|
||||||
float dataset_perc;
|
float dataset_perc;
|
||||||
float peak_perc;
|
float peak_perc;
|
||||||
float total_frag;
|
float total_frag;
|
||||||
size_t total_frag_bytes;
|
ssize_t total_frag_bytes;
|
||||||
float allocator_frag;
|
float allocator_frag;
|
||||||
size_t allocator_frag_bytes;
|
ssize_t allocator_frag_bytes;
|
||||||
float allocator_rss;
|
float allocator_rss;
|
||||||
size_t allocator_rss_bytes;
|
ssize_t allocator_rss_bytes;
|
||||||
float rss_extra;
|
float rss_extra;
|
||||||
size_t rss_extra_bytes;
|
size_t rss_extra_bytes;
|
||||||
size_t num_dbs;
|
size_t num_dbs;
|
||||||
|
@ -1699,6 +1702,7 @@ int zslLexValueLteMax(sds value, zlexrangespec *spec);
|
||||||
int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *level);
|
int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *level);
|
||||||
size_t freeMemoryGetNotCountedMemory();
|
size_t freeMemoryGetNotCountedMemory();
|
||||||
int freeMemoryIfNeeded(void);
|
int freeMemoryIfNeeded(void);
|
||||||
|
int freeMemoryIfNeededAndSafe(void);
|
||||||
int processCommand(client *c);
|
int processCommand(client *c);
|
||||||
void setupSignalHandlers(void);
|
void setupSignalHandlers(void);
|
||||||
struct redisCommand *lookupCommand(sds name);
|
struct redisCommand *lookupCommand(sds name);
|
||||||
|
|
18
src/util.c
18
src/util.c
|
@ -48,7 +48,7 @@
|
||||||
int stringmatchlen(const char *pattern, int patternLen,
|
int stringmatchlen(const char *pattern, int patternLen,
|
||||||
const char *string, int stringLen, int nocase)
|
const char *string, int stringLen, int nocase)
|
||||||
{
|
{
|
||||||
while(patternLen) {
|
while(patternLen && stringLen) {
|
||||||
switch(pattern[0]) {
|
switch(pattern[0]) {
|
||||||
case '*':
|
case '*':
|
||||||
while (pattern[1] == '*') {
|
while (pattern[1] == '*') {
|
||||||
|
@ -171,6 +171,22 @@ int stringmatch(const char *pattern, const char *string, int nocase) {
|
||||||
return stringmatchlen(pattern,strlen(pattern),string,strlen(string),nocase);
|
return stringmatchlen(pattern,strlen(pattern),string,strlen(string),nocase);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Fuzz stringmatchlen() trying to crash it with bad input. */
|
||||||
|
int stringmatchlen_fuzz_test(void) {
|
||||||
|
char str[32];
|
||||||
|
char pat[32];
|
||||||
|
int cycles = 10000000;
|
||||||
|
int total_matches = 0;
|
||||||
|
while(cycles--) {
|
||||||
|
int strlen = rand() % sizeof(str);
|
||||||
|
int patlen = rand() % sizeof(pat);
|
||||||
|
for (int j = 0; j < strlen; j++) str[j] = rand() % 128;
|
||||||
|
for (int j = 0; j < patlen; j++) pat[j] = rand() % 128;
|
||||||
|
total_matches += stringmatchlen(pat, patlen, str, strlen, 0);
|
||||||
|
}
|
||||||
|
return total_matches;
|
||||||
|
}
|
||||||
|
|
||||||
/* Convert a string representing an amount of memory into the number of
|
/* Convert a string representing an amount of memory into the number of
|
||||||
* bytes, so for instance memtoll("1Gb") will return 1073741824 that is
|
* bytes, so for instance memtoll("1Gb") will return 1073741824 that is
|
||||||
* (1024*1024*1024).
|
* (1024*1024*1024).
|
||||||
|
|
|
@ -40,6 +40,7 @@
|
||||||
|
|
||||||
int stringmatchlen(const char *p, int plen, const char *s, int slen, int nocase);
|
int stringmatchlen(const char *p, int plen, const char *s, int slen, int nocase);
|
||||||
int stringmatch(const char *p, const char *s, int nocase);
|
int stringmatch(const char *p, const char *s, int nocase);
|
||||||
|
int stringmatchlen_fuzz_test(void);
|
||||||
long long memtoll(const char *p, int *err);
|
long long memtoll(const char *p, int *err);
|
||||||
uint32_t digits10(uint64_t v);
|
uint32_t digits10(uint64_t v);
|
||||||
uint32_t sdigits10(int64_t v);
|
uint32_t sdigits10(int64_t v);
|
||||||
|
|
|
@ -275,7 +275,6 @@ start_server {tags {"repl"}} {
|
||||||
start_server {} {
|
start_server {} {
|
||||||
test "Master stream is correctly processed while the replica has a script in -BUSY state" {
|
test "Master stream is correctly processed while the replica has a script in -BUSY state" {
|
||||||
set slave [srv 0 client]
|
set slave [srv 0 client]
|
||||||
puts [srv 0 port]
|
|
||||||
$slave config set lua-time-limit 500
|
$slave config set lua-time-limit 500
|
||||||
$slave slaveof $master_host $master_port
|
$slave slaveof $master_host $master_port
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue