mirror of https://github.com/redis/redis.git
Merge remote-tracking branch 'upstream/unstable' into expiretypofix
This commit is contained in:
commit
1da37aa089
|
@ -3,11 +3,8 @@ name: CI
|
||||||
on: [push, pull_request]
|
on: [push, pull_request]
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
build-ubuntu:
|
test-ubuntu-latest:
|
||||||
strategy:
|
runs-on: ubuntu-latest
|
||||||
matrix:
|
|
||||||
platform: [ubuntu-latest, ubuntu-16.04]
|
|
||||||
runs-on: ${{ matrix.platform }}
|
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v1
|
- uses: actions/checkout@v1
|
||||||
- name: make
|
- name: make
|
||||||
|
@ -15,13 +12,17 @@ jobs:
|
||||||
- name: test
|
- name: test
|
||||||
run: |
|
run: |
|
||||||
sudo apt-get install tcl8.5
|
sudo apt-get install tcl8.5
|
||||||
make test
|
./runtest --clients 2 --verbose
|
||||||
|
|
||||||
build-macos-latest:
|
build-ubuntu-old:
|
||||||
strategy:
|
runs-on: ubuntu-16.04
|
||||||
matrix:
|
steps:
|
||||||
platform: [macos-latest, macOS-10.14]
|
- uses: actions/checkout@v1
|
||||||
runs-on: ${{ matrix.platform }}
|
- name: make
|
||||||
|
run: make
|
||||||
|
|
||||||
|
build-macos-latest:
|
||||||
|
runs-on: macos-latest
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v1
|
- uses: actions/checkout@v1
|
||||||
- name: make
|
- name: make
|
||||||
|
|
|
@ -1830,6 +1830,7 @@ void aclCommand(client *c) {
|
||||||
case ACL_DENIED_CMD: reasonstr="command"; break;
|
case ACL_DENIED_CMD: reasonstr="command"; break;
|
||||||
case ACL_DENIED_KEY: reasonstr="key"; break;
|
case ACL_DENIED_KEY: reasonstr="key"; break;
|
||||||
case ACL_DENIED_AUTH: reasonstr="auth"; break;
|
case ACL_DENIED_AUTH: reasonstr="auth"; break;
|
||||||
|
default: reasonstr="unknown";
|
||||||
}
|
}
|
||||||
addReplyBulkCString(c,reasonstr);
|
addReplyBulkCString(c,reasonstr);
|
||||||
|
|
||||||
|
|
8
src/ae.c
8
src/ae.c
|
@ -135,6 +135,14 @@ void aeDeleteEventLoop(aeEventLoop *eventLoop) {
|
||||||
aeApiFree(eventLoop);
|
aeApiFree(eventLoop);
|
||||||
zfree(eventLoop->events);
|
zfree(eventLoop->events);
|
||||||
zfree(eventLoop->fired);
|
zfree(eventLoop->fired);
|
||||||
|
|
||||||
|
/* Free the time events list. */
|
||||||
|
aeTimeEvent *next_te, *te = eventLoop->timeEventHead;
|
||||||
|
while (te) {
|
||||||
|
next_te = te->next;
|
||||||
|
zfree(te);
|
||||||
|
te = next_te;
|
||||||
|
}
|
||||||
zfree(eventLoop);
|
zfree(eventLoop);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1798,14 +1798,15 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
|
||||||
serverLog(LL_VERBOSE,
|
serverLog(LL_VERBOSE,
|
||||||
"Background AOF rewrite signal handler took %lldus", ustime()-now);
|
"Background AOF rewrite signal handler took %lldus", ustime()-now);
|
||||||
} else if (!bysignal && exitcode != 0) {
|
} else if (!bysignal && exitcode != 0) {
|
||||||
|
server.aof_lastbgrewrite_status = C_ERR;
|
||||||
|
|
||||||
|
serverLog(LL_WARNING,
|
||||||
|
"Background AOF rewrite terminated with error");
|
||||||
|
} else {
|
||||||
/* SIGUSR1 is whitelisted, so we have a way to kill a child without
|
/* SIGUSR1 is whitelisted, so we have a way to kill a child without
|
||||||
* tirggering an error condition. */
|
* tirggering an error condition. */
|
||||||
if (bysignal != SIGUSR1)
|
if (bysignal != SIGUSR1)
|
||||||
server.aof_lastbgrewrite_status = C_ERR;
|
server.aof_lastbgrewrite_status = C_ERR;
|
||||||
serverLog(LL_WARNING,
|
|
||||||
"Background AOF rewrite terminated with error");
|
|
||||||
} else {
|
|
||||||
server.aof_lastbgrewrite_status = C_ERR;
|
|
||||||
|
|
||||||
serverLog(LL_WARNING,
|
serverLog(LL_WARNING,
|
||||||
"Background AOF rewrite terminated by signal %d", bysignal);
|
"Background AOF rewrite terminated by signal %d", bysignal);
|
||||||
|
|
18
src/config.c
18
src/config.c
|
@ -108,12 +108,12 @@ clientBufferLimitsConfig clientBufferLimitsDefaults[CLIENT_TYPE_OBUF_COUNT] = {
|
||||||
/* Generic config infrastructure function pointers
|
/* Generic config infrastructure function pointers
|
||||||
* int is_valid_fn(val, err)
|
* int is_valid_fn(val, err)
|
||||||
* Return 1 when val is valid, and 0 when invalid.
|
* Return 1 when val is valid, and 0 when invalid.
|
||||||
* Optionslly set err to a static error string.
|
* Optionally set err to a static error string.
|
||||||
* int update_fn(val, prev, err)
|
* int update_fn(val, prev, err)
|
||||||
* This function is called only for CONFIG SET command (not at config file parsing)
|
* This function is called only for CONFIG SET command (not at config file parsing)
|
||||||
* It is called after the actual config is applied,
|
* It is called after the actual config is applied,
|
||||||
* Return 1 for success, and 0 for failure.
|
* Return 1 for success, and 0 for failure.
|
||||||
* Optionslly set err to a static error string.
|
* Optionally set err to a static error string.
|
||||||
* On failure the config change will be reverted.
|
* On failure the config change will be reverted.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
@ -518,7 +518,8 @@ void loadServerConfigFromString(char *config) {
|
||||||
return;
|
return;
|
||||||
|
|
||||||
loaderr:
|
loaderr:
|
||||||
fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
|
fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR (Redis %s) ***\n",
|
||||||
|
REDIS_VERSION);
|
||||||
fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
|
fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
|
||||||
fprintf(stderr, ">>> '%s'\n", lines[i]);
|
fprintf(stderr, ">>> '%s'\n", lines[i]);
|
||||||
fprintf(stderr, "%s\n", err);
|
fprintf(stderr, "%s\n", err);
|
||||||
|
@ -729,7 +730,7 @@ void configSetCommand(client *c) {
|
||||||
* config_set_memory_field(name,var) */
|
* config_set_memory_field(name,var) */
|
||||||
} config_set_memory_field(
|
} config_set_memory_field(
|
||||||
"client-query-buffer-limit",server.client_max_querybuf_len) {
|
"client-query-buffer-limit",server.client_max_querybuf_len) {
|
||||||
/* Everyhing else is an error... */
|
/* Everything else is an error... */
|
||||||
} config_set_else {
|
} config_set_else {
|
||||||
addReplyErrorFormat(c,"Unsupported CONFIG parameter: %s",
|
addReplyErrorFormat(c,"Unsupported CONFIG parameter: %s",
|
||||||
(char*)c->argv[2]->ptr);
|
(char*)c->argv[2]->ptr);
|
||||||
|
@ -1666,16 +1667,15 @@ static int enumConfigSet(typeData data, sds value, int update, char **err) {
|
||||||
sds enumerr = sdsnew("argument must be one of the following: ");
|
sds enumerr = sdsnew("argument must be one of the following: ");
|
||||||
configEnum *enumNode = data.enumd.enum_value;
|
configEnum *enumNode = data.enumd.enum_value;
|
||||||
while(enumNode->name != NULL) {
|
while(enumNode->name != NULL) {
|
||||||
enumerr = sdscatlen(enumerr, enumNode->name, strlen(enumNode->name));
|
enumerr = sdscatlen(enumerr, enumNode->name,
|
||||||
|
strlen(enumNode->name));
|
||||||
enumerr = sdscatlen(enumerr, ", ", 2);
|
enumerr = sdscatlen(enumerr, ", ", 2);
|
||||||
enumNode++;
|
enumNode++;
|
||||||
}
|
}
|
||||||
|
sdsrange(enumerr,0,-3); /* Remove final ", ". */
|
||||||
|
|
||||||
enumerr[sdslen(enumerr) - 2] = '\0';
|
|
||||||
|
|
||||||
/* Make sure we don't overrun the fixed buffer */
|
|
||||||
enumerr[LOADBUF_SIZE - 1] = '\0';
|
|
||||||
strncpy(loadbuf, enumerr, LOADBUF_SIZE);
|
strncpy(loadbuf, enumerr, LOADBUF_SIZE);
|
||||||
|
loadbuf[LOADBUF_SIZE - 1] = '\0';
|
||||||
|
|
||||||
sdsfree(enumerr);
|
sdsfree(enumerr);
|
||||||
*err = loadbuf;
|
*err = loadbuf;
|
||||||
|
|
41
src/db.c
41
src/db.c
|
@ -1305,6 +1305,8 @@ int expireIfNeeded(redisDb *db, robj *key) {
|
||||||
/* -----------------------------------------------------------------------------
|
/* -----------------------------------------------------------------------------
|
||||||
* API to get key arguments from commands
|
* API to get key arguments from commands
|
||||||
* ---------------------------------------------------------------------------*/
|
* ---------------------------------------------------------------------------*/
|
||||||
|
#define MAX_KEYS_BUFFER 65536
|
||||||
|
static int getKeysTempBuffer[MAX_KEYS_BUFFER];
|
||||||
|
|
||||||
/* The base case is to use the keys position as given in the command table
|
/* The base case is to use the keys position as given in the command table
|
||||||
* (firstkey, lastkey, step). */
|
* (firstkey, lastkey, step). */
|
||||||
|
@ -1319,7 +1321,12 @@ int *getKeysUsingCommandTable(struct redisCommand *cmd,robj **argv, int argc, in
|
||||||
|
|
||||||
last = cmd->lastkey;
|
last = cmd->lastkey;
|
||||||
if (last < 0) last = argc+last;
|
if (last < 0) last = argc+last;
|
||||||
keys = zmalloc(sizeof(int)*((last - cmd->firstkey)+1));
|
|
||||||
|
int count = ((last - cmd->firstkey)+1);
|
||||||
|
keys = getKeysTempBuffer;
|
||||||
|
if (count > MAX_KEYS_BUFFER)
|
||||||
|
keys = zmalloc(sizeof(int)*count);
|
||||||
|
|
||||||
for (j = cmd->firstkey; j <= last; j += cmd->keystep) {
|
for (j = cmd->firstkey; j <= last; j += cmd->keystep) {
|
||||||
if (j >= argc) {
|
if (j >= argc) {
|
||||||
/* Modules commands, and standard commands with a not fixed number
|
/* Modules commands, and standard commands with a not fixed number
|
||||||
|
@ -1329,7 +1336,7 @@ int *getKeysUsingCommandTable(struct redisCommand *cmd,robj **argv, int argc, in
|
||||||
* return no keys and expect the command implementation to report
|
* return no keys and expect the command implementation to report
|
||||||
* an arity or syntax error. */
|
* an arity or syntax error. */
|
||||||
if (cmd->flags & CMD_MODULE || cmd->arity < 0) {
|
if (cmd->flags & CMD_MODULE || cmd->arity < 0) {
|
||||||
zfree(keys);
|
getKeysFreeResult(keys);
|
||||||
*numkeys = 0;
|
*numkeys = 0;
|
||||||
return NULL;
|
return NULL;
|
||||||
} else {
|
} else {
|
||||||
|
@ -1365,7 +1372,8 @@ int *getKeysFromCommand(struct redisCommand *cmd, robj **argv, int argc, int *nu
|
||||||
|
|
||||||
/* Free the result of getKeysFromCommand. */
|
/* Free the result of getKeysFromCommand. */
|
||||||
void getKeysFreeResult(int *result) {
|
void getKeysFreeResult(int *result) {
|
||||||
zfree(result);
|
if (result != getKeysTempBuffer)
|
||||||
|
zfree(result);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Helper function to extract keys from following commands:
|
/* Helper function to extract keys from following commands:
|
||||||
|
@ -1386,7 +1394,9 @@ int *zunionInterGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *nu
|
||||||
/* Keys in z{union,inter}store come from two places:
|
/* Keys in z{union,inter}store come from two places:
|
||||||
* argv[1] = storage key,
|
* argv[1] = storage key,
|
||||||
* argv[3...n] = keys to intersect */
|
* argv[3...n] = keys to intersect */
|
||||||
keys = zmalloc(sizeof(int)*(num+1));
|
keys = getKeysTempBuffer;
|
||||||
|
if (num+1>MAX_KEYS_BUFFER)
|
||||||
|
keys = zmalloc(sizeof(int)*(num+1));
|
||||||
|
|
||||||
/* Add all key positions for argv[3...n] to keys[] */
|
/* Add all key positions for argv[3...n] to keys[] */
|
||||||
for (i = 0; i < num; i++) keys[i] = 3+i;
|
for (i = 0; i < num; i++) keys[i] = 3+i;
|
||||||
|
@ -1412,7 +1422,10 @@ int *evalGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys)
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
keys = zmalloc(sizeof(int)*num);
|
keys = getKeysTempBuffer;
|
||||||
|
if (num>MAX_KEYS_BUFFER)
|
||||||
|
keys = zmalloc(sizeof(int)*num);
|
||||||
|
|
||||||
*numkeys = num;
|
*numkeys = num;
|
||||||
|
|
||||||
/* Add all key positions for argv[3...n] to keys[] */
|
/* Add all key positions for argv[3...n] to keys[] */
|
||||||
|
@ -1433,7 +1446,7 @@ int *sortGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys)
|
||||||
UNUSED(cmd);
|
UNUSED(cmd);
|
||||||
|
|
||||||
num = 0;
|
num = 0;
|
||||||
keys = zmalloc(sizeof(int)*2); /* Alloc 2 places for the worst case. */
|
keys = getKeysTempBuffer; /* Alloc 2 places for the worst case. */
|
||||||
|
|
||||||
keys[num++] = 1; /* <sort-key> is always present. */
|
keys[num++] = 1; /* <sort-key> is always present. */
|
||||||
|
|
||||||
|
@ -1491,7 +1504,10 @@ int *migrateGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkey
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
keys = zmalloc(sizeof(int)*num);
|
keys = getKeysTempBuffer;
|
||||||
|
if (num>MAX_KEYS_BUFFER)
|
||||||
|
keys = zmalloc(sizeof(int)*num);
|
||||||
|
|
||||||
for (i = 0; i < num; i++) keys[i] = first+i;
|
for (i = 0; i < num; i++) keys[i] = first+i;
|
||||||
*numkeys = num;
|
*numkeys = num;
|
||||||
return keys;
|
return keys;
|
||||||
|
@ -1524,7 +1540,9 @@ int *georadiusGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numk
|
||||||
* argv[1] = key,
|
* argv[1] = key,
|
||||||
* argv[5...n] = stored key if present
|
* argv[5...n] = stored key if present
|
||||||
*/
|
*/
|
||||||
keys = zmalloc(sizeof(int) * num);
|
keys = getKeysTempBuffer;
|
||||||
|
if (num>MAX_KEYS_BUFFER)
|
||||||
|
keys = zmalloc(sizeof(int) * num);
|
||||||
|
|
||||||
/* Add all key positions to keys[] */
|
/* Add all key positions to keys[] */
|
||||||
keys[0] = 1;
|
keys[0] = 1;
|
||||||
|
@ -1542,7 +1560,7 @@ int *memoryGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys
|
||||||
UNUSED(cmd);
|
UNUSED(cmd);
|
||||||
|
|
||||||
if (argc >= 3 && !strcasecmp(argv[1]->ptr,"usage")) {
|
if (argc >= 3 && !strcasecmp(argv[1]->ptr,"usage")) {
|
||||||
keys = zmalloc(sizeof(int) * 1);
|
keys = getKeysTempBuffer;
|
||||||
keys[0] = 2;
|
keys[0] = 2;
|
||||||
*numkeys = 1;
|
*numkeys = 1;
|
||||||
return keys;
|
return keys;
|
||||||
|
@ -1589,7 +1607,10 @@ int *xreadGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys)
|
||||||
num /= 2; /* We have half the keys as there are arguments because
|
num /= 2; /* We have half the keys as there are arguments because
|
||||||
there are also the IDs, one per key. */
|
there are also the IDs, one per key. */
|
||||||
|
|
||||||
keys = zmalloc(sizeof(int) * num);
|
keys = getKeysTempBuffer;
|
||||||
|
if (num>MAX_KEYS_BUFFER)
|
||||||
|
keys = zmalloc(sizeof(int) * num);
|
||||||
|
|
||||||
for (i = streams_pos+1; i < argc-num; i++) keys[i-streams_pos-1] = i;
|
for (i = streams_pos+1; i < argc-num; i++) keys[i-streams_pos-1] = i;
|
||||||
*numkeys = num;
|
*numkeys = num;
|
||||||
return keys;
|
return keys;
|
||||||
|
|
|
@ -685,9 +685,12 @@ NULL
|
||||||
sds stats = sdsempty();
|
sds stats = sdsempty();
|
||||||
char buf[4096];
|
char buf[4096];
|
||||||
|
|
||||||
if (getLongFromObjectOrReply(c, c->argv[2], &dbid, NULL) != C_OK)
|
if (getLongFromObjectOrReply(c, c->argv[2], &dbid, NULL) != C_OK) {
|
||||||
|
sdsfree(stats);
|
||||||
return;
|
return;
|
||||||
|
}
|
||||||
if (dbid < 0 || dbid >= server.dbnum) {
|
if (dbid < 0 || dbid >= server.dbnum) {
|
||||||
|
sdsfree(stats);
|
||||||
addReplyError(c,"Out of range database");
|
addReplyError(c,"Out of range database");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
@ -871,6 +871,10 @@ unsigned long dictScan(dict *d,
|
||||||
|
|
||||||
if (dictSize(d) == 0) return 0;
|
if (dictSize(d) == 0) return 0;
|
||||||
|
|
||||||
|
/* Having a safe iterator means no rehashing can happen, see _dictRehashStep.
|
||||||
|
* This is needed in case the scan callback tries to do dictFind or alike. */
|
||||||
|
d->iterators++;
|
||||||
|
|
||||||
if (!dictIsRehashing(d)) {
|
if (!dictIsRehashing(d)) {
|
||||||
t0 = &(d->ht[0]);
|
t0 = &(d->ht[0]);
|
||||||
m0 = t0->sizemask;
|
m0 = t0->sizemask;
|
||||||
|
@ -937,6 +941,9 @@ unsigned long dictScan(dict *d,
|
||||||
} while (v & (m0 ^ m1));
|
} while (v & (m0 ^ m1));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* undo the ++ at the top */
|
||||||
|
d->iterators--;
|
||||||
|
|
||||||
return v;
|
return v;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
120
src/module.c
120
src/module.c
|
@ -1042,6 +1042,17 @@ RedisModuleString *RM_CreateStringFromLongLong(RedisModuleCtx *ctx, long long ll
|
||||||
return RM_CreateString(ctx,buf,len);
|
return RM_CreateString(ctx,buf,len);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Like RedisModule_CreatString(), but creates a string starting from a double
|
||||||
|
* integer instead of taking a buffer and its length.
|
||||||
|
*
|
||||||
|
* The returned string must be released with RedisModule_FreeString() or by
|
||||||
|
* enabling automatic memory management. */
|
||||||
|
RedisModuleString *RM_CreateStringFromDouble(RedisModuleCtx *ctx, double d) {
|
||||||
|
char buf[128];
|
||||||
|
size_t len = d2string(buf,sizeof(buf),d);
|
||||||
|
return RM_CreateString(ctx,buf,len);
|
||||||
|
}
|
||||||
|
|
||||||
/* Like RedisModule_CreatString(), but creates a string starting from a long
|
/* Like RedisModule_CreatString(), but creates a string starting from a long
|
||||||
* double.
|
* double.
|
||||||
*
|
*
|
||||||
|
@ -3529,6 +3540,8 @@ void moduleTypeNameByID(char *name, uint64_t moduleid) {
|
||||||
* // Optional fields
|
* // Optional fields
|
||||||
* .digest = myType_DigestCallBack,
|
* .digest = myType_DigestCallBack,
|
||||||
* .mem_usage = myType_MemUsageCallBack,
|
* .mem_usage = myType_MemUsageCallBack,
|
||||||
|
* .aux_load = myType_AuxRDBLoadCallBack,
|
||||||
|
* .aux_save = myType_AuxRDBSaveCallBack,
|
||||||
* }
|
* }
|
||||||
*
|
*
|
||||||
* * **rdb_load**: A callback function pointer that loads data from RDB files.
|
* * **rdb_load**: A callback function pointer that loads data from RDB files.
|
||||||
|
@ -3536,6 +3549,10 @@ void moduleTypeNameByID(char *name, uint64_t moduleid) {
|
||||||
* * **aof_rewrite**: A callback function pointer that rewrites data as commands.
|
* * **aof_rewrite**: A callback function pointer that rewrites data as commands.
|
||||||
* * **digest**: A callback function pointer that is used for `DEBUG DIGEST`.
|
* * **digest**: A callback function pointer that is used for `DEBUG DIGEST`.
|
||||||
* * **free**: A callback function pointer that can free a type value.
|
* * **free**: A callback function pointer that can free a type value.
|
||||||
|
* * **aux_save**: A callback function pointer that saves out of keyspace data to RDB files.
|
||||||
|
* 'when' argument is either REDISMODULE_AUX_BEFORE_RDB or REDISMODULE_AUX_AFTER_RDB.
|
||||||
|
* * **aux_load**: A callback function pointer that loads out of keyspace data from RDB files.
|
||||||
|
* Similar to aux_save, returns REDISMODULE_OK on success, and ERR otherwise.
|
||||||
*
|
*
|
||||||
* The **digest* and **mem_usage** methods should currently be omitted since
|
* The **digest* and **mem_usage** methods should currently be omitted since
|
||||||
* they are not yet implemented inside the Redis modules core.
|
* they are not yet implemented inside the Redis modules core.
|
||||||
|
@ -4290,12 +4307,15 @@ void unblockClientFromModule(client *c) {
|
||||||
* We must call moduleUnblockClient in order to free privdata and
|
* We must call moduleUnblockClient in order to free privdata and
|
||||||
* RedisModuleBlockedClient.
|
* RedisModuleBlockedClient.
|
||||||
*
|
*
|
||||||
* Note that clients implementing threads and working with private data,
|
* Note that we only do that for clients that are blocked on keys, for which
|
||||||
* should make sure to stop the threads or protect the private data
|
* the contract is that the module should not call RM_UnblockClient under
|
||||||
* in some other way in the disconnection and timeout callback, because
|
* normal circumstances.
|
||||||
* here we are going to free the private data associated with the
|
* Clients implementing threads and working with private data should be
|
||||||
* blocked client. */
|
* aware that calling RM_UnblockClient for every blocked client is their
|
||||||
if (!bc->unblocked)
|
* responsibility, and if they fail to do so memory may leak. Ideally they
|
||||||
|
* should implement the disconnect and timeout callbacks and call
|
||||||
|
* RM_UnblockClient, but any other way is also acceptable. */
|
||||||
|
if (bc->blocked_on_keys && !bc->unblocked)
|
||||||
moduleUnblockClient(c);
|
moduleUnblockClient(c);
|
||||||
|
|
||||||
bc->client = NULL;
|
bc->client = NULL;
|
||||||
|
@ -4409,6 +4429,10 @@ int moduleTryServeClientBlockedOnKey(client *c, robj *key) {
|
||||||
*
|
*
|
||||||
* free_privdata: called in order to free the private data that is passed
|
* free_privdata: called in order to free the private data that is passed
|
||||||
* by RedisModule_UnblockClient() call.
|
* by RedisModule_UnblockClient() call.
|
||||||
|
*
|
||||||
|
* Note: RedisModule_UnblockClient should be called for every blocked client,
|
||||||
|
* even if client was killed, timed-out or disconnected. Failing to do so
|
||||||
|
* will result in memory leaks.
|
||||||
*/
|
*/
|
||||||
RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) {
|
RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) {
|
||||||
return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, NULL,0,NULL);
|
return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, NULL,0,NULL);
|
||||||
|
@ -4463,7 +4487,15 @@ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc
|
||||||
* freed using the free_privdata callback provided by the user.
|
* freed using the free_privdata callback provided by the user.
|
||||||
*
|
*
|
||||||
* However the reply callback will be able to access the argument vector of
|
* However the reply callback will be able to access the argument vector of
|
||||||
* the command, so the private data is often not needed. */
|
* the command, so the private data is often not needed.
|
||||||
|
*
|
||||||
|
* Note: Under normal circumstances RedisModule_UnblockClient should not be
|
||||||
|
* called for clients that are blocked on keys (Either the key will
|
||||||
|
* become ready or a timeout will occur). If for some reason you do want
|
||||||
|
* to call RedisModule_UnblockClient it is possible: Client will be
|
||||||
|
* handled as if it were timed-out (You must implement the timeout
|
||||||
|
* callback in that case).
|
||||||
|
*/
|
||||||
RedisModuleBlockedClient *RM_BlockClientOnKeys(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata) {
|
RedisModuleBlockedClient *RM_BlockClientOnKeys(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata) {
|
||||||
return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, keys,numkeys,privdata);
|
return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, keys,numkeys,privdata);
|
||||||
}
|
}
|
||||||
|
@ -4725,9 +4757,9 @@ int RM_BlockedClientDisconnected(RedisModuleCtx *ctx) {
|
||||||
*
|
*
|
||||||
* To call non-reply APIs, the thread safe context must be prepared with:
|
* To call non-reply APIs, the thread safe context must be prepared with:
|
||||||
*
|
*
|
||||||
* RedisModule_ThreadSafeCallStart(ctx);
|
* RedisModule_ThreadSafeContextLock(ctx);
|
||||||
* ... make your call here ...
|
* ... make your call here ...
|
||||||
* RedisModule_ThreadSafeCallStop(ctx);
|
* RedisModule_ThreadSafeContextUnlock(ctx);
|
||||||
*
|
*
|
||||||
* This is not needed when using `RedisModule_Reply*` functions, assuming
|
* This is not needed when using `RedisModule_Reply*` functions, assuming
|
||||||
* that a blocked client was used when the context was created, otherwise
|
* that a blocked client was used when the context was created, otherwise
|
||||||
|
@ -6494,24 +6526,32 @@ void RM_ScanCursorDestroy(RedisModuleScanCursor *cursor) {
|
||||||
zfree(cursor);
|
zfree(cursor);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Scan api that allows a module to scan all the keys and value in the selected db.
|
/* Scan API that allows a module to scan all the keys and value in
|
||||||
|
* the selected db.
|
||||||
*
|
*
|
||||||
* Callback for scan implementation.
|
* Callback for scan implementation.
|
||||||
* void scan_callback(RedisModuleCtx *ctx, RedisModuleString *keyname, RedisModuleKey *key, void *privdata);
|
* void scan_callback(RedisModuleCtx *ctx, RedisModuleString *keyname,
|
||||||
* - ctx - the redis module context provided to for the scan.
|
* RedisModuleKey *key, void *privdata);
|
||||||
* - keyname - owned by the caller and need to be retained if used after this function.
|
* ctx - the redis module context provided to for the scan.
|
||||||
* - key - holds info on the key and value, it is provided as best effort, in some cases it might
|
* keyname - owned by the caller and need to be retained if used after this
|
||||||
* be NULL, in which case the user should (can) use RedisModule_OpenKey (and CloseKey too).
|
* function.
|
||||||
* when it is provided, it is owned by the caller and will be free when the callback returns.
|
*
|
||||||
* - privdata - the user data provided to RedisModule_Scan.
|
* key - holds info on the key and value, it is provided as best effort, in
|
||||||
|
* some cases it might be NULL, in which case the user should (can) use
|
||||||
|
* RedisModule_OpenKey (and CloseKey too).
|
||||||
|
* when it is provided, it is owned by the caller and will be free when the
|
||||||
|
* callback returns.
|
||||||
|
*
|
||||||
|
* privdata - the user data provided to RedisModule_Scan.
|
||||||
*
|
*
|
||||||
* The way it should be used:
|
* The way it should be used:
|
||||||
* RedisModuleCursor *c = RedisModule_ScanCursorCreate();
|
* RedisModuleCursor *c = RedisModule_ScanCursorCreate();
|
||||||
* while(RedisModule_Scan(ctx, c, callback, privateData));
|
* while(RedisModule_Scan(ctx, c, callback, privateData));
|
||||||
* RedisModule_ScanCursorDestroy(c);
|
* RedisModule_ScanCursorDestroy(c);
|
||||||
*
|
*
|
||||||
* It is also possible to use this API from another thread while the lock is acquired durring
|
* It is also possible to use this API from another thread while the lock
|
||||||
* the actuall call to RM_Scan:
|
* is acquired durring the actuall call to RM_Scan:
|
||||||
|
*
|
||||||
* RedisModuleCursor *c = RedisModule_ScanCursorCreate();
|
* RedisModuleCursor *c = RedisModule_ScanCursorCreate();
|
||||||
* RedisModule_ThreadSafeContextLock(ctx);
|
* RedisModule_ThreadSafeContextLock(ctx);
|
||||||
* while(RedisModule_Scan(ctx, c, callback, privateData)){
|
* while(RedisModule_Scan(ctx, c, callback, privateData)){
|
||||||
|
@ -6521,9 +6561,26 @@ void RM_ScanCursorDestroy(RedisModuleScanCursor *cursor) {
|
||||||
* }
|
* }
|
||||||
* RedisModule_ScanCursorDestroy(c);
|
* RedisModule_ScanCursorDestroy(c);
|
||||||
*
|
*
|
||||||
* The function will return 1 if there are more elements to scan and 0 otherwise,
|
* The function will return 1 if there are more elements to scan and
|
||||||
* possibly setting errno if the call failed.
|
* 0 otherwise, possibly setting errno if the call failed.
|
||||||
* It is also possible to restart and existing cursor using RM_CursorRestart. */
|
*
|
||||||
|
* It is also possible to restart and existing cursor using RM_CursorRestart.
|
||||||
|
*
|
||||||
|
* IMPORTANT: This API is very similar to the Redis SCAN command from the
|
||||||
|
* point of view of the guarantees it provides. This means that the API
|
||||||
|
* may report duplicated keys, but guarantees to report at least one time
|
||||||
|
* every key that was there from the start to the end of the scanning process.
|
||||||
|
*
|
||||||
|
* NOTE: If you do database changes within the callback, you should be aware
|
||||||
|
* that the internal state of the database may change. For instance it is safe
|
||||||
|
* to delete or modify the current key, but may not be safe to delete any
|
||||||
|
* other key.
|
||||||
|
* Moreover playing with the Redis keyspace while iterating may have the
|
||||||
|
* effect of returning more duplicates. A safe pattern is to store the keys
|
||||||
|
* names you want to modify elsewhere, and perform the actions on the keys
|
||||||
|
* later when the iteration is complete. Howerver this can cost a lot of
|
||||||
|
* memory, so it may make sense to just operate on the current key when
|
||||||
|
* possible during the iteration, given that this is safe. */
|
||||||
int RM_Scan(RedisModuleCtx *ctx, RedisModuleScanCursor *cursor, RedisModuleScanCB fn, void *privdata) {
|
int RM_Scan(RedisModuleCtx *ctx, RedisModuleScanCursor *cursor, RedisModuleScanCB fn, void *privdata) {
|
||||||
if (cursor->done) {
|
if (cursor->done) {
|
||||||
errno = ENOENT;
|
errno = ENOENT;
|
||||||
|
@ -6601,9 +6658,17 @@ static void moduleScanKeyCallback(void *privdata, const dictEntry *de) {
|
||||||
* RedisModule_CloseKey(key);
|
* RedisModule_CloseKey(key);
|
||||||
* RedisModule_ScanCursorDestroy(c);
|
* RedisModule_ScanCursorDestroy(c);
|
||||||
*
|
*
|
||||||
* The function will return 1 if there are more elements to scan and 0 otherwise,
|
* The function will return 1 if there are more elements to scan and 0 otherwise,
|
||||||
* possibly setting errno if the call failed.
|
* possibly setting errno if the call failed.
|
||||||
* It is also possible to restart and existing cursor using RM_CursorRestart. */
|
* It is also possible to restart and existing cursor using RM_CursorRestart.
|
||||||
|
*
|
||||||
|
* NOTE: Certain operations are unsafe while iterating the object. For instance
|
||||||
|
* while the API guarantees to return at least one time all the elements that
|
||||||
|
* are present in the data structure consistently from the start to the end
|
||||||
|
* of the iteration (see HSCAN and similar commands documentation), the more
|
||||||
|
* you play with the elements, the more duplicates you may get. In general
|
||||||
|
* deleting the current element of the data structure is safe, while removing
|
||||||
|
* the key you are iterating is not safe. */
|
||||||
int RM_ScanKey(RedisModuleKey *key, RedisModuleScanCursor *cursor, RedisModuleScanKeyCB fn, void *privdata) {
|
int RM_ScanKey(RedisModuleKey *key, RedisModuleScanCursor *cursor, RedisModuleScanKeyCB fn, void *privdata) {
|
||||||
if (key == NULL || key->value == NULL) {
|
if (key == NULL || key->value == NULL) {
|
||||||
errno = EINVAL;
|
errno = EINVAL;
|
||||||
|
@ -6709,7 +6774,7 @@ int RM_Fork(RedisModuleForkDoneHandler cb, void *user_data) {
|
||||||
server.module_child_pid = childpid;
|
server.module_child_pid = childpid;
|
||||||
moduleForkInfo.done_handler = cb;
|
moduleForkInfo.done_handler = cb;
|
||||||
moduleForkInfo.done_handler_user_data = user_data;
|
moduleForkInfo.done_handler_user_data = user_data;
|
||||||
serverLog(LL_NOTICE, "Module fork started pid: %d ", childpid);
|
serverLog(LL_VERBOSE, "Module fork started pid: %d ", childpid);
|
||||||
}
|
}
|
||||||
return childpid;
|
return childpid;
|
||||||
}
|
}
|
||||||
|
@ -6732,7 +6797,7 @@ int TerminateModuleForkChild(int child_pid, int wait) {
|
||||||
server.module_child_pid != child_pid) return C_ERR;
|
server.module_child_pid != child_pid) return C_ERR;
|
||||||
|
|
||||||
int statloc;
|
int statloc;
|
||||||
serverLog(LL_NOTICE,"Killing running module fork child: %ld",
|
serverLog(LL_VERBOSE,"Killing running module fork child: %ld",
|
||||||
(long) server.module_child_pid);
|
(long) server.module_child_pid);
|
||||||
if (kill(server.module_child_pid,SIGUSR1) != -1 && wait) {
|
if (kill(server.module_child_pid,SIGUSR1) != -1 && wait) {
|
||||||
while(wait4(server.module_child_pid,&statloc,0,NULL) !=
|
while(wait4(server.module_child_pid,&statloc,0,NULL) !=
|
||||||
|
@ -7669,6 +7734,7 @@ void moduleRegisterCoreAPI(void) {
|
||||||
REGISTER_API(CreateStringFromCallReply);
|
REGISTER_API(CreateStringFromCallReply);
|
||||||
REGISTER_API(CreateString);
|
REGISTER_API(CreateString);
|
||||||
REGISTER_API(CreateStringFromLongLong);
|
REGISTER_API(CreateStringFromLongLong);
|
||||||
|
REGISTER_API(CreateStringFromDouble);
|
||||||
REGISTER_API(CreateStringFromLongDouble);
|
REGISTER_API(CreateStringFromLongDouble);
|
||||||
REGISTER_API(CreateStringFromString);
|
REGISTER_API(CreateStringFromString);
|
||||||
REGISTER_API(CreateStringPrintf);
|
REGISTER_API(CreateStringPrintf);
|
||||||
|
|
|
@ -1365,6 +1365,12 @@ void resetClient(client *c) {
|
||||||
if (!(c->flags & CLIENT_MULTI) && prevcmd != askingCommand)
|
if (!(c->flags & CLIENT_MULTI) && prevcmd != askingCommand)
|
||||||
c->flags &= ~CLIENT_ASKING;
|
c->flags &= ~CLIENT_ASKING;
|
||||||
|
|
||||||
|
/* We do the same for the CACHING command as well. It also affects
|
||||||
|
* the next command or transaction executed, in a way very similar
|
||||||
|
* to ASKING. */
|
||||||
|
if (!(c->flags & CLIENT_MULTI) && prevcmd != clientCommand)
|
||||||
|
c->flags &= ~CLIENT_TRACKING_CACHING;
|
||||||
|
|
||||||
/* Remove the CLIENT_REPLY_SKIP flag if any so that the reply
|
/* Remove the CLIENT_REPLY_SKIP flag if any so that the reply
|
||||||
* to the next command will be sent, but set the flag if the command
|
* to the next command will be sent, but set the flag if the command
|
||||||
* we just processed was "CLIENT REPLY SKIP". */
|
* we just processed was "CLIENT REPLY SKIP". */
|
||||||
|
@ -2044,7 +2050,7 @@ void clientCommand(client *c) {
|
||||||
"REPLY (on|off|skip) -- Control the replies sent to the current connection.",
|
"REPLY (on|off|skip) -- Control the replies sent to the current connection.",
|
||||||
"SETNAME <name> -- Assign the name <name> to the current connection.",
|
"SETNAME <name> -- Assign the name <name> to the current connection.",
|
||||||
"UNBLOCK <clientid> [TIMEOUT|ERROR] -- Unblock the specified blocked client.",
|
"UNBLOCK <clientid> [TIMEOUT|ERROR] -- Unblock the specified blocked client.",
|
||||||
"TRACKING (on|off) [REDIRECT <id>] [BCAST] [PREFIX first] [PREFIX second] ... -- Enable client keys tracking for client side caching.",
|
"TRACKING (on|off) [REDIRECT <id>] [BCAST] [PREFIX first] [PREFIX second] [OPTIN] [OPTOUT]... -- Enable client keys tracking for client side caching.",
|
||||||
"GETREDIR -- Return the client ID we are redirecting to when tracking is enabled.",
|
"GETREDIR -- Return the client ID we are redirecting to when tracking is enabled.",
|
||||||
NULL
|
NULL
|
||||||
};
|
};
|
||||||
|
@ -2221,9 +2227,9 @@ NULL
|
||||||
addReply(c,shared.ok);
|
addReply(c,shared.ok);
|
||||||
} else if (!strcasecmp(c->argv[1]->ptr,"tracking") && c->argc >= 3) {
|
} else if (!strcasecmp(c->argv[1]->ptr,"tracking") && c->argc >= 3) {
|
||||||
/* CLIENT TRACKING (on|off) [REDIRECT <id>] [BCAST] [PREFIX first]
|
/* CLIENT TRACKING (on|off) [REDIRECT <id>] [BCAST] [PREFIX first]
|
||||||
* [PREFIX second] ... */
|
* [PREFIX second] [OPTIN] [OPTOUT] ... */
|
||||||
long long redir = 0;
|
long long redir = 0;
|
||||||
int bcast = 0;
|
uint64_t options = 0;
|
||||||
robj **prefix = NULL;
|
robj **prefix = NULL;
|
||||||
size_t numprefix = 0;
|
size_t numprefix = 0;
|
||||||
|
|
||||||
|
@ -2256,7 +2262,11 @@ NULL
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
} else if (!strcasecmp(c->argv[j]->ptr,"bcast")) {
|
} else if (!strcasecmp(c->argv[j]->ptr,"bcast")) {
|
||||||
bcast = 1;
|
options |= CLIENT_TRACKING_BCAST;
|
||||||
|
} else if (!strcasecmp(c->argv[j]->ptr,"optin")) {
|
||||||
|
options |= CLIENT_TRACKING_OPTIN;
|
||||||
|
} else if (!strcasecmp(c->argv[j]->ptr,"optout")) {
|
||||||
|
options |= CLIENT_TRACKING_OPTOUT;
|
||||||
} else if (!strcasecmp(c->argv[j]->ptr,"prefix") && moreargs) {
|
} else if (!strcasecmp(c->argv[j]->ptr,"prefix") && moreargs) {
|
||||||
j++;
|
j++;
|
||||||
prefix = zrealloc(prefix,sizeof(robj*)*(numprefix+1));
|
prefix = zrealloc(prefix,sizeof(robj*)*(numprefix+1));
|
||||||
|
@ -2272,7 +2282,7 @@ NULL
|
||||||
if (!strcasecmp(c->argv[2]->ptr,"on")) {
|
if (!strcasecmp(c->argv[2]->ptr,"on")) {
|
||||||
/* Before enabling tracking, make sure options are compatible
|
/* Before enabling tracking, make sure options are compatible
|
||||||
* among each other and with the current state of the client. */
|
* among each other and with the current state of the client. */
|
||||||
if (!bcast && numprefix) {
|
if (!(options & CLIENT_TRACKING_BCAST) && numprefix) {
|
||||||
addReplyError(c,
|
addReplyError(c,
|
||||||
"PREFIX option requires BCAST mode to be enabled");
|
"PREFIX option requires BCAST mode to be enabled");
|
||||||
zfree(prefix);
|
zfree(prefix);
|
||||||
|
@ -2281,7 +2291,8 @@ NULL
|
||||||
|
|
||||||
if (c->flags & CLIENT_TRACKING) {
|
if (c->flags & CLIENT_TRACKING) {
|
||||||
int oldbcast = !!(c->flags & CLIENT_TRACKING_BCAST);
|
int oldbcast = !!(c->flags & CLIENT_TRACKING_BCAST);
|
||||||
if (oldbcast != bcast) {
|
int newbcast = !!(options & CLIENT_TRACKING_BCAST);
|
||||||
|
if (oldbcast != newbcast) {
|
||||||
addReplyError(c,
|
addReplyError(c,
|
||||||
"You can't switch BCAST mode on/off before disabling "
|
"You can't switch BCAST mode on/off before disabling "
|
||||||
"tracking for this client, and then re-enabling it with "
|
"tracking for this client, and then re-enabling it with "
|
||||||
|
@ -2290,7 +2301,17 @@ NULL
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
enableTracking(c,redir,bcast,prefix,numprefix);
|
|
||||||
|
if (options & CLIENT_TRACKING_BCAST &&
|
||||||
|
options & (CLIENT_TRACKING_OPTIN|CLIENT_TRACKING_OPTOUT))
|
||||||
|
{
|
||||||
|
addReplyError(c,
|
||||||
|
"OPTIN and OPTOUT are not compatible with BCAST");
|
||||||
|
zfree(prefix);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
enableTracking(c,redir,options,prefix,numprefix);
|
||||||
} else if (!strcasecmp(c->argv[2]->ptr,"off")) {
|
} else if (!strcasecmp(c->argv[2]->ptr,"off")) {
|
||||||
disableTracking(c);
|
disableTracking(c);
|
||||||
} else {
|
} else {
|
||||||
|
@ -2300,6 +2321,36 @@ NULL
|
||||||
}
|
}
|
||||||
zfree(prefix);
|
zfree(prefix);
|
||||||
addReply(c,shared.ok);
|
addReply(c,shared.ok);
|
||||||
|
} else if (!strcasecmp(c->argv[1]->ptr,"caching") && c->argc >= 3) {
|
||||||
|
if (!(c->flags & CLIENT_TRACKING)) {
|
||||||
|
addReplyError(c,"CLIENT CACHING can be called only when the "
|
||||||
|
"client is in tracking mode with OPTIN or "
|
||||||
|
"OPTOUT mode enabled");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
char *opt = c->argv[2]->ptr;
|
||||||
|
if (!strcasecmp(opt,"yes")) {
|
||||||
|
if (c->flags & CLIENT_TRACKING_OPTIN) {
|
||||||
|
c->flags |= CLIENT_TRACKING_CACHING;
|
||||||
|
} else {
|
||||||
|
addReplyError(c,"CLIENT CACHING YES is only valid when tracking is enabled in OPTIN mode.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
} else if (!strcasecmp(opt,"no")) {
|
||||||
|
if (c->flags & CLIENT_TRACKING_OPTOUT) {
|
||||||
|
c->flags |= CLIENT_TRACKING_CACHING;
|
||||||
|
} else {
|
||||||
|
addReplyError(c,"CLIENT CACHING NO is only valid when tracking is enabled in OPTOUT mode.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
addReply(c,shared.syntaxerr);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Common reply for when we succeeded. */
|
||||||
|
addReply(c,shared.ok);
|
||||||
} else if (!strcasecmp(c->argv[1]->ptr,"getredir") && c->argc == 2) {
|
} else if (!strcasecmp(c->argv[1]->ptr,"getredir") && c->argc == 2) {
|
||||||
/* CLIENT GETREDIR */
|
/* CLIENT GETREDIR */
|
||||||
if (c->flags & CLIENT_TRACKING) {
|
if (c->flags & CLIENT_TRACKING) {
|
||||||
|
|
|
@ -2195,7 +2195,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
|
||||||
io.ver = 2;
|
io.ver = 2;
|
||||||
/* Call the rdb_load method of the module providing the 10 bit
|
/* Call the rdb_load method of the module providing the 10 bit
|
||||||
* encoding version in the lower 10 bits of the module ID. */
|
* encoding version in the lower 10 bits of the module ID. */
|
||||||
if (mt->aux_load(&io,moduleid&1023, when) || io.error) {
|
if (mt->aux_load(&io,moduleid&1023, when) != REDISMODULE_OK || io.error) {
|
||||||
moduleTypeNameByID(name,moduleid);
|
moduleTypeNameByID(name,moduleid);
|
||||||
serverLog(LL_WARNING,"The RDB file contains module AUX data for the module type '%s', that the responsible module is not able to load. Check for modules log above for additional clues.", name);
|
serverLog(LL_WARNING,"The RDB file contains module AUX data for the module type '%s', that the responsible module is not able to load. Check for modules log above for additional clues.", name);
|
||||||
exit(1);
|
exit(1);
|
||||||
|
|
|
@ -7737,7 +7737,7 @@ static void LRUTestMode(void) {
|
||||||
* to fill the target instance easily. */
|
* to fill the target instance easily. */
|
||||||
start_cycle = mstime();
|
start_cycle = mstime();
|
||||||
long long hits = 0, misses = 0;
|
long long hits = 0, misses = 0;
|
||||||
while(mstime() - start_cycle < 1000) {
|
while(mstime() - start_cycle < LRU_CYCLE_PERIOD) {
|
||||||
/* Write cycle. */
|
/* Write cycle. */
|
||||||
for (j = 0; j < LRU_CYCLE_PIPELINE_SIZE; j++) {
|
for (j = 0; j < LRU_CYCLE_PIPELINE_SIZE; j++) {
|
||||||
char val[6];
|
char val[6];
|
||||||
|
|
|
@ -467,6 +467,7 @@ size_t REDISMODULE_API_FUNC(RedisModule_CallReplyLength)(RedisModuleCallReply *r
|
||||||
RedisModuleCallReply *REDISMODULE_API_FUNC(RedisModule_CallReplyArrayElement)(RedisModuleCallReply *reply, size_t idx);
|
RedisModuleCallReply *REDISMODULE_API_FUNC(RedisModule_CallReplyArrayElement)(RedisModuleCallReply *reply, size_t idx);
|
||||||
RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateString)(RedisModuleCtx *ctx, const char *ptr, size_t len);
|
RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateString)(RedisModuleCtx *ctx, const char *ptr, size_t len);
|
||||||
RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateStringFromLongLong)(RedisModuleCtx *ctx, long long ll);
|
RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateStringFromLongLong)(RedisModuleCtx *ctx, long long ll);
|
||||||
|
RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateStringFromDouble)(RedisModuleCtx *ctx, double d);
|
||||||
RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateStringFromLongDouble)(RedisModuleCtx *ctx, long double ld, int humanfriendly);
|
RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateStringFromLongDouble)(RedisModuleCtx *ctx, long double ld, int humanfriendly);
|
||||||
RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateStringFromString)(RedisModuleCtx *ctx, const RedisModuleString *str);
|
RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateStringFromString)(RedisModuleCtx *ctx, const RedisModuleString *str);
|
||||||
RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateStringPrintf)(RedisModuleCtx *ctx, const char *fmt, ...);
|
RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateStringPrintf)(RedisModuleCtx *ctx, const char *fmt, ...);
|
||||||
|
@ -726,6 +727,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
|
||||||
REDISMODULE_GET_API(CreateStringFromCallReply);
|
REDISMODULE_GET_API(CreateStringFromCallReply);
|
||||||
REDISMODULE_GET_API(CreateString);
|
REDISMODULE_GET_API(CreateString);
|
||||||
REDISMODULE_GET_API(CreateStringFromLongLong);
|
REDISMODULE_GET_API(CreateStringFromLongLong);
|
||||||
|
REDISMODULE_GET_API(CreateStringFromDouble);
|
||||||
REDISMODULE_GET_API(CreateStringFromLongDouble);
|
REDISMODULE_GET_API(CreateStringFromLongDouble);
|
||||||
REDISMODULE_GET_API(CreateStringFromString);
|
REDISMODULE_GET_API(CreateStringFromString);
|
||||||
REDISMODULE_GET_API(CreateStringPrintf);
|
REDISMODULE_GET_API(CreateStringPrintf);
|
||||||
|
|
|
@ -1352,7 +1352,7 @@ void disklessLoadRestoreBackups(redisDb *backup, int restore, int empty_db_flags
|
||||||
/* Asynchronously read the SYNC payload we receive from a master */
|
/* Asynchronously read the SYNC payload we receive from a master */
|
||||||
#define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */
|
#define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */
|
||||||
void readSyncBulkPayload(connection *conn) {
|
void readSyncBulkPayload(connection *conn) {
|
||||||
char buf[4096];
|
char buf[PROTO_IOBUF_LEN];
|
||||||
ssize_t nread, readlen, nwritten;
|
ssize_t nread, readlen, nwritten;
|
||||||
int use_diskless_load = useDisklessLoad();
|
int use_diskless_load = useDisklessLoad();
|
||||||
redisDb *diskless_load_backup = NULL;
|
redisDb *diskless_load_backup = NULL;
|
||||||
|
|
|
@ -36,7 +36,12 @@
|
||||||
* the include of your alternate allocator if needed (not needed in order
|
* the include of your alternate allocator if needed (not needed in order
|
||||||
* to use the default libc allocator). */
|
* to use the default libc allocator). */
|
||||||
|
|
||||||
|
#ifndef __SDS_ALLOC_H__
|
||||||
|
#define __SDS_ALLOC_H__
|
||||||
|
|
||||||
#include "zmalloc.h"
|
#include "zmalloc.h"
|
||||||
#define s_malloc zmalloc
|
#define s_malloc zmalloc
|
||||||
#define s_realloc zrealloc
|
#define s_realloc zrealloc
|
||||||
#define s_free zfree
|
#define s_free zfree
|
||||||
|
|
||||||
|
#endif
|
||||||
|
|
|
@ -3773,6 +3773,7 @@ void addReplyCommand(client *c, struct redisCommand *cmd) {
|
||||||
flagcount += addReplyCommandFlag(c,cmd,CMD_SKIP_SLOWLOG, "skip_slowlog");
|
flagcount += addReplyCommandFlag(c,cmd,CMD_SKIP_SLOWLOG, "skip_slowlog");
|
||||||
flagcount += addReplyCommandFlag(c,cmd,CMD_ASKING, "asking");
|
flagcount += addReplyCommandFlag(c,cmd,CMD_ASKING, "asking");
|
||||||
flagcount += addReplyCommandFlag(c,cmd,CMD_FAST, "fast");
|
flagcount += addReplyCommandFlag(c,cmd,CMD_FAST, "fast");
|
||||||
|
flagcount += addReplyCommandFlag(c,cmd,CMD_NO_AUTH, "no_auth");
|
||||||
if ((cmd->getkeys_proc && !(cmd->flags & CMD_MODULE)) ||
|
if ((cmd->getkeys_proc && !(cmd->flags & CMD_MODULE)) ||
|
||||||
cmd->flags & CMD_MODULE_GETKEYS)
|
cmd->flags & CMD_MODULE_GETKEYS)
|
||||||
{
|
{
|
||||||
|
|
|
@ -248,6 +248,10 @@ typedef long long ustime_t; /* microsecond time type. */
|
||||||
perform client side caching. */
|
perform client side caching. */
|
||||||
#define CLIENT_TRACKING_BROKEN_REDIR (1ULL<<32) /* Target client is invalid. */
|
#define CLIENT_TRACKING_BROKEN_REDIR (1ULL<<32) /* Target client is invalid. */
|
||||||
#define CLIENT_TRACKING_BCAST (1ULL<<33) /* Tracking in BCAST mode. */
|
#define CLIENT_TRACKING_BCAST (1ULL<<33) /* Tracking in BCAST mode. */
|
||||||
|
#define CLIENT_TRACKING_OPTIN (1ULL<<34) /* Tracking in opt-in mode. */
|
||||||
|
#define CLIENT_TRACKING_OPTOUT (1ULL<<35) /* Tracking in opt-out mode. */
|
||||||
|
#define CLIENT_TRACKING_CACHING (1ULL<<36) /* CACHING yes/no was given,
|
||||||
|
depending on optin/optout mode. */
|
||||||
|
|
||||||
/* Client block type (btype field in client structure)
|
/* Client block type (btype field in client structure)
|
||||||
* if CLIENT_BLOCKED flag is set. */
|
* if CLIENT_BLOCKED flag is set. */
|
||||||
|
@ -1651,7 +1655,7 @@ void addReplyStatusFormat(client *c, const char *fmt, ...);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
/* Client side caching (tracking mode) */
|
/* Client side caching (tracking mode) */
|
||||||
void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, size_t numprefix);
|
void enableTracking(client *c, uint64_t redirect_to, uint64_t options, robj **prefix, size_t numprefix);
|
||||||
void disableTracking(client *c);
|
void disableTracking(client *c);
|
||||||
void trackingRememberKeys(client *c);
|
void trackingRememberKeys(client *c);
|
||||||
void trackingInvalidateKey(robj *keyobj);
|
void trackingInvalidateKey(robj *keyobj);
|
||||||
|
|
|
@ -415,7 +415,7 @@ void spopWithCountCommand(client *c) {
|
||||||
|
|
||||||
/* Make sure a key with the name inputted exists, and that it's type is
|
/* Make sure a key with the name inputted exists, and that it's type is
|
||||||
* indeed a set. Otherwise, return nil */
|
* indeed a set. Otherwise, return nil */
|
||||||
if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.null[c->resp]))
|
if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.emptyset[c->resp]))
|
||||||
== NULL || checkType(c,set,OBJ_SET)) return;
|
== NULL || checkType(c,set,OBJ_SET)) return;
|
||||||
|
|
||||||
/* If count is zero, serve an empty set ASAP to avoid special
|
/* If count is zero, serve an empty set ASAP to avoid special
|
||||||
|
|
|
@ -848,7 +848,7 @@ void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupnam
|
||||||
argv[11] = createStringObject("JUSTID",6);
|
argv[11] = createStringObject("JUSTID",6);
|
||||||
argv[12] = createStringObject("LASTID",6);
|
argv[12] = createStringObject("LASTID",6);
|
||||||
argv[13] = createObjectFromStreamID(&group->last_id);
|
argv[13] = createObjectFromStreamID(&group->last_id);
|
||||||
propagate(server.xclaimCommand,c->db->id,argv,14,PROPAGATE_AOF|PROPAGATE_REPL);
|
alsoPropagate(server.xclaimCommand,c->db->id,argv,14,PROPAGATE_AOF|PROPAGATE_REPL);
|
||||||
decrRefCount(argv[0]);
|
decrRefCount(argv[0]);
|
||||||
decrRefCount(argv[3]);
|
decrRefCount(argv[3]);
|
||||||
decrRefCount(argv[4]);
|
decrRefCount(argv[4]);
|
||||||
|
@ -875,7 +875,7 @@ void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupna
|
||||||
argv[2] = key;
|
argv[2] = key;
|
||||||
argv[3] = groupname;
|
argv[3] = groupname;
|
||||||
argv[4] = createObjectFromStreamID(&group->last_id);
|
argv[4] = createObjectFromStreamID(&group->last_id);
|
||||||
propagate(server.xgroupCommand,c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL);
|
alsoPropagate(server.xgroupCommand,c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL);
|
||||||
decrRefCount(argv[0]);
|
decrRefCount(argv[0]);
|
||||||
decrRefCount(argv[1]);
|
decrRefCount(argv[1]);
|
||||||
decrRefCount(argv[4]);
|
decrRefCount(argv[4]);
|
||||||
|
@ -1068,9 +1068,7 @@ size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start
|
||||||
* by the user by other means. In that case we signal it emitting
|
* by the user by other means. In that case we signal it emitting
|
||||||
* the ID but then a NULL entry for the fields. */
|
* the ID but then a NULL entry for the fields. */
|
||||||
addReplyArrayLen(c,2);
|
addReplyArrayLen(c,2);
|
||||||
streamID id;
|
addReplyStreamID(c,&thisid);
|
||||||
streamDecodeID(ri.key,&id);
|
|
||||||
addReplyStreamID(c,&id);
|
|
||||||
addReplyNullArray(c);
|
addReplyNullArray(c);
|
||||||
} else {
|
} else {
|
||||||
streamNACK *nack = ri.data;
|
streamNACK *nack = ri.data;
|
||||||
|
|
|
@ -93,7 +93,8 @@ void disableTracking(client *c) {
|
||||||
if (c->flags & CLIENT_TRACKING) {
|
if (c->flags & CLIENT_TRACKING) {
|
||||||
server.tracking_clients--;
|
server.tracking_clients--;
|
||||||
c->flags &= ~(CLIENT_TRACKING|CLIENT_TRACKING_BROKEN_REDIR|
|
c->flags &= ~(CLIENT_TRACKING|CLIENT_TRACKING_BROKEN_REDIR|
|
||||||
CLIENT_TRACKING_BCAST);
|
CLIENT_TRACKING_BCAST|CLIENT_TRACKING_OPTIN|
|
||||||
|
CLIENT_TRACKING_OPTOUT);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -124,10 +125,11 @@ void enableBcastTrackingForPrefix(client *c, char *prefix, size_t plen) {
|
||||||
* eventually get freed, we'll send a message to the original client to
|
* eventually get freed, we'll send a message to the original client to
|
||||||
* inform it of the condition. Multiple clients can redirect the invalidation
|
* inform it of the condition. Multiple clients can redirect the invalidation
|
||||||
* messages to the same client ID. */
|
* messages to the same client ID. */
|
||||||
void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, size_t numprefix) {
|
void enableTracking(client *c, uint64_t redirect_to, uint64_t options, robj **prefix, size_t numprefix) {
|
||||||
if (!(c->flags & CLIENT_TRACKING)) server.tracking_clients++;
|
if (!(c->flags & CLIENT_TRACKING)) server.tracking_clients++;
|
||||||
c->flags |= CLIENT_TRACKING;
|
c->flags |= CLIENT_TRACKING;
|
||||||
c->flags &= ~(CLIENT_TRACKING_BROKEN_REDIR|CLIENT_TRACKING_BCAST);
|
c->flags &= ~(CLIENT_TRACKING_BROKEN_REDIR|CLIENT_TRACKING_BCAST|
|
||||||
|
CLIENT_TRACKING_OPTIN|CLIENT_TRACKING_OPTOUT);
|
||||||
c->client_tracking_redirection = redirect_to;
|
c->client_tracking_redirection = redirect_to;
|
||||||
if (TrackingTable == NULL) {
|
if (TrackingTable == NULL) {
|
||||||
TrackingTable = raxNew();
|
TrackingTable = raxNew();
|
||||||
|
@ -135,7 +137,7 @@ void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, s
|
||||||
TrackingChannelName = createStringObject("__redis__:invalidate",20);
|
TrackingChannelName = createStringObject("__redis__:invalidate",20);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (bcast) {
|
if (options & CLIENT_TRACKING_BCAST) {
|
||||||
c->flags |= CLIENT_TRACKING_BCAST;
|
c->flags |= CLIENT_TRACKING_BCAST;
|
||||||
if (numprefix == 0) enableBcastTrackingForPrefix(c,"",0);
|
if (numprefix == 0) enableBcastTrackingForPrefix(c,"",0);
|
||||||
for (size_t j = 0; j < numprefix; j++) {
|
for (size_t j = 0; j < numprefix; j++) {
|
||||||
|
@ -143,14 +145,23 @@ void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, s
|
||||||
enableBcastTrackingForPrefix(c,sdsprefix,sdslen(sdsprefix));
|
enableBcastTrackingForPrefix(c,sdsprefix,sdslen(sdsprefix));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
c->flags |= options & (CLIENT_TRACKING_OPTIN|CLIENT_TRACKING_OPTOUT);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* This function is called after the execution of a readonly command in the
|
/* This function is called after the execution of a readonly command in the
|
||||||
* case the client 'c' has keys tracking enabled. It will populate the
|
* case the client 'c' has keys tracking enabled and the tracking is not
|
||||||
* tracking invalidation table according to the keys the user fetched, so that
|
* in BCAST mode. It will populate the tracking invalidation table according
|
||||||
* Redis will know what are the clients that should receive an invalidation
|
* to the keys the user fetched, so that Redis will know what are the clients
|
||||||
* message with certain groups of keys are modified. */
|
* that should receive an invalidation message with certain groups of keys
|
||||||
|
* are modified. */
|
||||||
void trackingRememberKeys(client *c) {
|
void trackingRememberKeys(client *c) {
|
||||||
|
/* Return if we are in optin/out mode and the right CACHING command
|
||||||
|
* was/wasn't given in order to modify the default behavior. */
|
||||||
|
uint64_t optin = c->flags & CLIENT_TRACKING_OPTIN;
|
||||||
|
uint64_t optout = c->flags & CLIENT_TRACKING_OPTOUT;
|
||||||
|
uint64_t caching_given = c->flags & CLIENT_TRACKING_CACHING;
|
||||||
|
if ((optin && !caching_given) || (optout && caching_given)) return;
|
||||||
|
|
||||||
int numkeys;
|
int numkeys;
|
||||||
int *keys = getKeysFromCommand(c->cmd,c->argv,c->argc,&numkeys);
|
int *keys = getKeysFromCommand(c->cmd,c->argv,c->argc,&numkeys);
|
||||||
if (keys == NULL) return;
|
if (keys == NULL) return;
|
||||||
|
|
|
@ -114,6 +114,27 @@ start_server {} {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# wait for all the slaves to be in sync with the master
|
||||||
|
set master_ofs [status $R($master_id) master_repl_offset]
|
||||||
|
wait_for_condition 500 100 {
|
||||||
|
$master_ofs == [status $R(0) master_repl_offset] &&
|
||||||
|
$master_ofs == [status $R(1) master_repl_offset] &&
|
||||||
|
$master_ofs == [status $R(2) master_repl_offset] &&
|
||||||
|
$master_ofs == [status $R(3) master_repl_offset] &&
|
||||||
|
$master_ofs == [status $R(4) master_repl_offset]
|
||||||
|
} else {
|
||||||
|
if {$debug_msg} {
|
||||||
|
for {set j 0} {$j < 5} {incr j} {
|
||||||
|
puts "$j: sync_full: [status $R($j) sync_full]"
|
||||||
|
puts "$j: id1 : [status $R($j) master_replid]:[status $R($j) master_repl_offset]"
|
||||||
|
puts "$j: id2 : [status $R($j) master_replid2]:[status $R($j) second_repl_offset]"
|
||||||
|
puts "$j: backlog : firstbyte=[status $R($j) repl_backlog_first_byte_offset] len=[status $R($j) repl_backlog_histlen]"
|
||||||
|
puts "---"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fail "Slaves are not in sync with the master after too long time."
|
||||||
|
}
|
||||||
|
|
||||||
# Put down the old master so that it cannot generate more
|
# Put down the old master so that it cannot generate more
|
||||||
# replication stream, this way in the next master switch, the time at
|
# replication stream, this way in the next master switch, the time at
|
||||||
# which we move slaves away is not important, each will have full
|
# which we move slaves away is not important, each will have full
|
||||||
|
|
|
@ -42,7 +42,7 @@ int fork_create(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||||
|
|
||||||
/* child */
|
/* child */
|
||||||
RedisModule_Log(ctx, "notice", "fork child started");
|
RedisModule_Log(ctx, "notice", "fork child started");
|
||||||
usleep(200000);
|
usleep(500000);
|
||||||
RedisModule_Log(ctx, "notice", "fork child exiting");
|
RedisModule_Log(ctx, "notice", "fork child exiting");
|
||||||
RedisModule_ExitFromChild(code_to_exit_with);
|
RedisModule_ExitFromChild(code_to_exit_with);
|
||||||
/* unreachable */
|
/* unreachable */
|
||||||
|
|
|
@ -141,6 +141,18 @@ proc tags {tags code} {
|
||||||
uplevel 1 $code
|
uplevel 1 $code
|
||||||
set ::tags [lrange $::tags 0 end-[llength $tags]]
|
set ::tags [lrange $::tags 0 end-[llength $tags]]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# Write the configuration in the dictionary 'config' in the specified
|
||||||
|
# file name.
|
||||||
|
proc create_server_config_file {filename config} {
|
||||||
|
set fp [open $filename w+]
|
||||||
|
foreach directive [dict keys $config] {
|
||||||
|
puts -nonewline $fp "$directive "
|
||||||
|
puts $fp [dict get $config $directive]
|
||||||
|
}
|
||||||
|
close $fp
|
||||||
|
}
|
||||||
|
|
||||||
proc start_server {options {code undefined}} {
|
proc start_server {options {code undefined}} {
|
||||||
# If we are running against an external server, we just push the
|
# If we are running against an external server, we just push the
|
||||||
# host/port pair in the stack the first time
|
# host/port pair in the stack the first time
|
||||||
|
@ -222,68 +234,91 @@ proc start_server {options {code undefined}} {
|
||||||
|
|
||||||
# write new configuration to temporary file
|
# write new configuration to temporary file
|
||||||
set config_file [tmpfile redis.conf]
|
set config_file [tmpfile redis.conf]
|
||||||
set fp [open $config_file w+]
|
create_server_config_file $config_file $config
|
||||||
foreach directive [dict keys $config] {
|
|
||||||
puts -nonewline $fp "$directive "
|
|
||||||
puts $fp [dict get $config $directive]
|
|
||||||
}
|
|
||||||
close $fp
|
|
||||||
|
|
||||||
set stdout [format "%s/%s" [dict get $config "dir"] "stdout"]
|
set stdout [format "%s/%s" [dict get $config "dir"] "stdout"]
|
||||||
set stderr [format "%s/%s" [dict get $config "dir"] "stderr"]
|
set stderr [format "%s/%s" [dict get $config "dir"] "stderr"]
|
||||||
|
|
||||||
send_data_packet $::test_server_fd "server-spawning" "port $::port"
|
# We need a loop here to retry with different ports.
|
||||||
|
set server_started 0
|
||||||
if {$::valgrind} {
|
while {$server_started == 0} {
|
||||||
set pid [exec valgrind --track-origins=yes --suppressions=src/valgrind.sup --show-reachable=no --show-possibly-lost=no --leak-check=full src/redis-server $config_file > $stdout 2> $stderr &]
|
if {$::verbose} {
|
||||||
} elseif ($::stack_logging) {
|
puts -nonewline "=== ($tags) Starting server ${::host}:${::port} "
|
||||||
set pid [exec /usr/bin/env MallocStackLogging=1 MallocLogFile=/tmp/malloc_log.txt src/redis-server $config_file > $stdout 2> $stderr &]
|
|
||||||
} else {
|
|
||||||
set pid [exec src/redis-server $config_file > $stdout 2> $stderr &]
|
|
||||||
}
|
|
||||||
|
|
||||||
# Tell the test server about this new instance.
|
|
||||||
send_data_packet $::test_server_fd server-spawned $pid
|
|
||||||
|
|
||||||
# check that the server actually started
|
|
||||||
# ugly but tries to be as fast as possible...
|
|
||||||
if {$::valgrind} {set retrynum 1000} else {set retrynum 100}
|
|
||||||
|
|
||||||
if {$::verbose} {
|
|
||||||
puts -nonewline "=== ($tags) Starting server ${::host}:${::port} "
|
|
||||||
}
|
|
||||||
|
|
||||||
if {$code ne "undefined"} {
|
|
||||||
set serverisup [server_is_up $::host $::port $retrynum]
|
|
||||||
} else {
|
|
||||||
set serverisup 1
|
|
||||||
}
|
|
||||||
|
|
||||||
if {$::verbose} {
|
|
||||||
puts ""
|
|
||||||
}
|
|
||||||
|
|
||||||
if {!$serverisup} {
|
|
||||||
set err {}
|
|
||||||
append err [exec cat $stdout] "\n" [exec cat $stderr]
|
|
||||||
start_server_error $config_file $err
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
# Wait for actual startup
|
|
||||||
set checkperiod 100; # Milliseconds
|
|
||||||
set maxiter [expr {120*1000/100}] ; # Wait up to 2 minutes.
|
|
||||||
while {![info exists _pid]} {
|
|
||||||
regexp {PID:\s(\d+)} [exec cat $stdout] _ _pid
|
|
||||||
after $checkperiod
|
|
||||||
incr maxiter -1
|
|
||||||
if {$maxiter == 0} {
|
|
||||||
start_server_error $config_file "No PID detected in log $stdout"
|
|
||||||
puts "--- LOG CONTENT ---"
|
|
||||||
puts [exec cat $stdout]
|
|
||||||
puts "-------------------"
|
|
||||||
break
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
send_data_packet $::test_server_fd "server-spawning" "port $::port"
|
||||||
|
|
||||||
|
if {$::valgrind} {
|
||||||
|
set pid [exec valgrind --track-origins=yes --suppressions=src/valgrind.sup --show-reachable=no --show-possibly-lost=no --leak-check=full src/redis-server $config_file > $stdout 2> $stderr &]
|
||||||
|
} elseif ($::stack_logging) {
|
||||||
|
set pid [exec /usr/bin/env MallocStackLogging=1 MallocLogFile=/tmp/malloc_log.txt src/redis-server $config_file > $stdout 2> $stderr &]
|
||||||
|
} else {
|
||||||
|
set pid [exec src/redis-server $config_file > $stdout 2> $stderr &]
|
||||||
|
}
|
||||||
|
|
||||||
|
# Tell the test server about this new instance.
|
||||||
|
send_data_packet $::test_server_fd server-spawned $pid
|
||||||
|
|
||||||
|
# check that the server actually started
|
||||||
|
# ugly but tries to be as fast as possible...
|
||||||
|
if {$::valgrind} {set retrynum 1000} else {set retrynum 100}
|
||||||
|
|
||||||
|
# Wait for actual startup
|
||||||
|
set checkperiod 100; # Milliseconds
|
||||||
|
set maxiter [expr {120*1000/100}] ; # Wait up to 2 minutes.
|
||||||
|
set port_busy 0
|
||||||
|
while {![info exists _pid]} {
|
||||||
|
regexp {PID:\s(\d+)} [exec cat $stdout] _ _pid
|
||||||
|
after $checkperiod
|
||||||
|
incr maxiter -1
|
||||||
|
if {$maxiter == 0} {
|
||||||
|
start_server_error $config_file "No PID detected in log $stdout"
|
||||||
|
puts "--- LOG CONTENT ---"
|
||||||
|
puts [exec cat $stdout]
|
||||||
|
puts "-------------------"
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
# Check if the port is actually busy and the server failed
|
||||||
|
# for this reason.
|
||||||
|
if {[regexp {Could not create server TCP} [exec cat $stdout]]} {
|
||||||
|
set port_busy 1
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
# Sometimes we have to try a different port, even if we checked
|
||||||
|
# for availability. Other test clients may grab the port before we
|
||||||
|
# are able to do it for example.
|
||||||
|
if {$port_busy} {
|
||||||
|
puts "Port $::port was already busy, trying another port..."
|
||||||
|
set ::port [find_available_port [expr {$::port+1}]]
|
||||||
|
if {$::tls} {
|
||||||
|
dict set config "tls-port" $::port
|
||||||
|
} else {
|
||||||
|
dict set config port $::port
|
||||||
|
}
|
||||||
|
create_server_config_file $config_file $config
|
||||||
|
continue; # Try again
|
||||||
|
}
|
||||||
|
|
||||||
|
if {$code ne "undefined"} {
|
||||||
|
set serverisup [server_is_up $::host $::port $retrynum]
|
||||||
|
} else {
|
||||||
|
set serverisup 1
|
||||||
|
}
|
||||||
|
|
||||||
|
if {$::verbose} {
|
||||||
|
puts ""
|
||||||
|
}
|
||||||
|
|
||||||
|
if {!$serverisup} {
|
||||||
|
set err {}
|
||||||
|
append err [exec cat $stdout] "\n" [exec cat $stderr]
|
||||||
|
start_server_error $config_file $err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
set server_started 1
|
||||||
}
|
}
|
||||||
|
|
||||||
# setup properties to be able to initialize a client object
|
# setup properties to be able to initialize a client object
|
||||||
|
|
|
@ -39,6 +39,8 @@ start_server {tags {"memefficiency"}} {
|
||||||
start_server {tags {"defrag"}} {
|
start_server {tags {"defrag"}} {
|
||||||
if {[string match {*jemalloc*} [s mem_allocator]]} {
|
if {[string match {*jemalloc*} [s mem_allocator]]} {
|
||||||
test "Active defrag" {
|
test "Active defrag" {
|
||||||
|
r config set save "" ;# prevent bgsave from interfereing with save below
|
||||||
|
r config set hz 100
|
||||||
r config set activedefrag no
|
r config set activedefrag no
|
||||||
r config set active-defrag-threshold-lower 5
|
r config set active-defrag-threshold-lower 5
|
||||||
r config set active-defrag-cycle-min 65
|
r config set active-defrag-cycle-min 65
|
||||||
|
@ -46,8 +48,8 @@ start_server {tags {"defrag"}} {
|
||||||
r config set active-defrag-ignore-bytes 2mb
|
r config set active-defrag-ignore-bytes 2mb
|
||||||
r config set maxmemory 100mb
|
r config set maxmemory 100mb
|
||||||
r config set maxmemory-policy allkeys-lru
|
r config set maxmemory-policy allkeys-lru
|
||||||
r debug populate 700000 asdf 150
|
r debug populate 700000 asdf1 150
|
||||||
r debug populate 170000 asdf 300
|
r debug populate 170000 asdf2 300
|
||||||
r ping ;# trigger eviction following the previous population
|
r ping ;# trigger eviction following the previous population
|
||||||
after 120 ;# serverCron only updates the info once in 100ms
|
after 120 ;# serverCron only updates the info once in 100ms
|
||||||
set frag [s allocator_frag_ratio]
|
set frag [s allocator_frag_ratio]
|
||||||
|
@ -55,6 +57,11 @@ start_server {tags {"defrag"}} {
|
||||||
puts "frag $frag"
|
puts "frag $frag"
|
||||||
}
|
}
|
||||||
assert {$frag >= 1.4}
|
assert {$frag >= 1.4}
|
||||||
|
|
||||||
|
r config set latency-monitor-threshold 5
|
||||||
|
r latency reset
|
||||||
|
r config set maxmemory 110mb ;# prevent further eviction (not to fail the digest test)
|
||||||
|
set digest [r debug digest]
|
||||||
catch {r config set activedefrag yes} e
|
catch {r config set activedefrag yes} e
|
||||||
if {![string match {DISABLED*} $e]} {
|
if {![string match {DISABLED*} $e]} {
|
||||||
# Wait for the active defrag to start working (decision once a
|
# Wait for the active defrag to start working (decision once a
|
||||||
|
@ -78,19 +85,37 @@ start_server {tags {"defrag"}} {
|
||||||
# Test the the fragmentation is lower.
|
# Test the the fragmentation is lower.
|
||||||
after 120 ;# serverCron only updates the info once in 100ms
|
after 120 ;# serverCron only updates the info once in 100ms
|
||||||
set frag [s allocator_frag_ratio]
|
set frag [s allocator_frag_ratio]
|
||||||
|
set max_latency 0
|
||||||
|
foreach event [r latency latest] {
|
||||||
|
lassign $event eventname time latency max
|
||||||
|
if {$eventname == "active-defrag-cycle"} {
|
||||||
|
set max_latency $max
|
||||||
|
}
|
||||||
|
}
|
||||||
if {$::verbose} {
|
if {$::verbose} {
|
||||||
puts "frag $frag"
|
puts "frag $frag"
|
||||||
|
puts "max latency $max_latency"
|
||||||
|
puts [r latency latest]
|
||||||
|
puts [r latency history active-defrag-cycle]
|
||||||
}
|
}
|
||||||
assert {$frag < 1.1}
|
assert {$frag < 1.1}
|
||||||
|
# due to high fragmentation, 100hz, and active-defrag-cycle-max set to 75,
|
||||||
|
# we expect max latency to be not much higher than 7.5ms but due to rare slowness threshold is set higher
|
||||||
|
assert {$max_latency <= 30}
|
||||||
} else {
|
} else {
|
||||||
set _ ""
|
set _ ""
|
||||||
}
|
}
|
||||||
} {}
|
# verify the data isn't corrupted or changed
|
||||||
|
set newdigest [r debug digest]
|
||||||
|
assert {$digest eq $newdigest}
|
||||||
|
r save ;# saving an rdb iterates over all the data / pointers
|
||||||
|
} {OK}
|
||||||
|
|
||||||
test "Active defrag big keys" {
|
test "Active defrag big keys" {
|
||||||
r flushdb
|
r flushdb
|
||||||
r config resetstat
|
r config resetstat
|
||||||
r config set save "" ;# prevent bgsave from interfereing with save below
|
r config set save "" ;# prevent bgsave from interfereing with save below
|
||||||
|
r config set hz 100
|
||||||
r config set activedefrag no
|
r config set activedefrag no
|
||||||
r config set active-defrag-max-scan-fields 1000
|
r config set active-defrag-max-scan-fields 1000
|
||||||
r config set active-defrag-threshold-lower 5
|
r config set active-defrag-threshold-lower 5
|
||||||
|
@ -142,7 +167,7 @@ start_server {tags {"defrag"}} {
|
||||||
for {set j 0} {$j < 500000} {incr j} {
|
for {set j 0} {$j < 500000} {incr j} {
|
||||||
$rd read ; # Discard replies
|
$rd read ; # Discard replies
|
||||||
}
|
}
|
||||||
assert {[r dbsize] == 500010}
|
assert_equal [r dbsize] 500010
|
||||||
|
|
||||||
# create some fragmentation
|
# create some fragmentation
|
||||||
for {set j 0} {$j < 500000} {incr j 2} {
|
for {set j 0} {$j < 500000} {incr j 2} {
|
||||||
|
@ -151,7 +176,7 @@ start_server {tags {"defrag"}} {
|
||||||
for {set j 0} {$j < 500000} {incr j 2} {
|
for {set j 0} {$j < 500000} {incr j 2} {
|
||||||
$rd read ; # Discard replies
|
$rd read ; # Discard replies
|
||||||
}
|
}
|
||||||
assert {[r dbsize] == 250010}
|
assert_equal [r dbsize] 250010
|
||||||
|
|
||||||
# start defrag
|
# start defrag
|
||||||
after 120 ;# serverCron only updates the info once in 100ms
|
after 120 ;# serverCron only updates the info once in 100ms
|
||||||
|
@ -200,9 +225,9 @@ start_server {tags {"defrag"}} {
|
||||||
puts [r latency history active-defrag-cycle]
|
puts [r latency history active-defrag-cycle]
|
||||||
}
|
}
|
||||||
assert {$frag < 1.1}
|
assert {$frag < 1.1}
|
||||||
# due to high fragmentation, 10hz, and active-defrag-cycle-max set to 75,
|
# due to high fragmentation, 100hz, and active-defrag-cycle-max set to 75,
|
||||||
# we expect max latency to be not much higher than 75ms
|
# we expect max latency to be not much higher than 7.5ms but due to rare slowness threshold is set higher
|
||||||
assert {$max_latency <= 120}
|
assert {$max_latency <= 30}
|
||||||
}
|
}
|
||||||
# verify the data isn't corrupted or changed
|
# verify the data isn't corrupted or changed
|
||||||
set newdigest [r debug digest]
|
set newdigest [r debug digest]
|
||||||
|
@ -292,8 +317,8 @@ start_server {tags {"defrag"}} {
|
||||||
}
|
}
|
||||||
assert {$frag < 1.1}
|
assert {$frag < 1.1}
|
||||||
# due to high fragmentation, 100hz, and active-defrag-cycle-max set to 75,
|
# due to high fragmentation, 100hz, and active-defrag-cycle-max set to 75,
|
||||||
# we expect max latency to be not much higher than 7.5ms
|
# we expect max latency to be not much higher than 7.5ms but due to rare slowness threshold is set higher
|
||||||
assert {$max_latency <= 12}
|
assert {$max_latency <= 30}
|
||||||
}
|
}
|
||||||
# verify the data isn't corrupted or changed
|
# verify the data isn't corrupted or changed
|
||||||
set newdigest [r debug digest]
|
set newdigest [r debug digest]
|
||||||
|
|
|
@ -20,9 +20,8 @@ start_server {tags {"modules"}} {
|
||||||
|
|
||||||
test {Module fork kill} {
|
test {Module fork kill} {
|
||||||
r fork.create 3
|
r fork.create 3
|
||||||
after 20
|
after 250
|
||||||
r fork.kill
|
r fork.kill
|
||||||
after 100
|
|
||||||
|
|
||||||
assert {[count_log_message "fork child started"] eq "2"}
|
assert {[count_log_message "fork child started"] eq "2"}
|
||||||
assert {[count_log_message "Received SIGUSR1 in child"] eq "1"}
|
assert {[count_log_message "Received SIGUSR1 in child"] eq "1"}
|
||||||
|
|
Loading…
Reference in New Issue