Improve multithreaded performance with memory prefetching (#14017)

This PR is based on: https://github.com/valkey-io/valkey/pull/861

> ### Memory Access Amortization
> (Designed and implemented by [dan
touitou](https://github.com/touitou-dan))
> 
> Memory Access Amortization (MAA) is a technique designed to optimize
the performance of dynamic data structures by reducing the impact of
memory access latency. It is applicable when multiple operations need to
be executed concurrently. The principle behind it is that for certain
dynamic data structures, executing operations in a batch is more
efficient than executing each one separately.
> 
> Rather than executing operations sequentially, this approach
interleaves the execution of all operations. This is done in such a way
that whenever a memory access is required during an operation, the
program prefetches the necessary memory and transitions to another
operation. This ensures that when one operation is blocked awaiting
memory access, other memory accesses are executed in parallel, thereby
reducing the average access latency.
> 
> We applied this method in the development of dictPrefetch, which takes
as parameters a vector of keys and dictionaries. It ensures that all
memory addresses required to execute dictionary operations for these
keys are loaded into the L1-L3 caches when executing commands.
Essentially, dictPrefetch is an interleaved execution of dictFind for
all the keys.

### Implementation of Redis
When the main thread processes clients with ready-to-execute commands
(i.e., clients for which the IO thread has parsed the commands), a batch
of up to 16 commands is created. Initially, the command's argv, which
were allocated by the IO thread, is prefetched to the main thread's L1
cache. Subsequently, all the dict entries and values required for the
commands are prefetched from the dictionary before the command
execution.

#### Memory prefetching for main hash table
As shown in the picture, after https://github.com/redis/redis/pull/13806
, we unify key value and the dict uses no_value optimization, so the
memory prefetching has 4 steps:

1. prefetch the bucket of the hash table
2. prefetch the entry associated with the given key's hash
3. prefetch the kv object of the entry
4. prefetch the value data of the kv object

we also need to handle the case that the dict entry is the pointer of kv
object, just skip step 3.

MAA can improves single-threaded memory access efficiency by
interleaving the execution of multiple independent operations, allowing
memory-level parallelism and better CPU utilization. Its key point is
batch-wise interleaved execution. Split a batch of independent
operations (such as multiple key lookups) into multiple state machines,
and interleave their progress within a single thread to hide the memory
access latency of individual requests.

The difference between serial execution and interleaved execution:
**naive serial execution**
```
key1: step1 → wait → step2 → wait → done
key2: step1 → wait → step2 → wait → done
```
**interleaved execution**
```
key1: step1   → step2   → done
key2:   step1 → step2   → done
key3:     step1 → step2 → done
         ↑ While waiting for key1’s memory, progress key2/key3
```

#### New configuration
This PR involves a new configuration `prefetch-batch-max-size`, but we
think it is a low level optimization, so we hide this config:
When multiple commands are parsed by the I/O threads and ready for
execution, we take advantage of knowing the next set of commands and
prefetch their required dictionary entries in a batch. This reduces
memory access costs. The optimal batch size depends on the specific
workflow of the user. The default batch size is 16, which can be
modified using the 'prefetch-batch-max-size' config.
When the config is set to 0, prefetching is disabled.

---------

Co-authored-by: Uri Yagelnik <uriy@amazon.com>
Co-authored-by: Ozan Tezcan <ozantezcan@gmail.com>
This commit is contained in:
Yuan Wang 2025-06-05 08:57:43 +08:00 committed by GitHub
parent b7c6755b1b
commit 70a079db5e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 706 additions and 14 deletions

View File

@ -375,7 +375,7 @@ endif
REDIS_SERVER_NAME=redis-server$(PROG_SUFFIX)
REDIS_SENTINEL_NAME=redis-sentinel$(PROG_SUFFIX)
REDIS_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o ebuckets.o eventnotifier.o iothread.o mstr.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o lolwut8.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
REDIS_SERVER_OBJ=threads_mngr.o memory_prefetch.o adlist.o quicklist.o ae.o anet.o dict.o ebuckets.o eventnotifier.o iothread.o mstr.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o lolwut8.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
REDIS_CLI_NAME=redis-cli$(PROG_SUFFIX)
REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o ae.o redisassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o
REDIS_BENCHMARK_NAME=redis-benchmark$(PROG_SUFFIX)

View File

@ -3168,6 +3168,7 @@ standardConfig static_configs[] = {
createIntConfig("databases", NULL, IMMUTABLE_CONFIG, 1, INT_MAX, server.dbnum, 16, INTEGER_CONFIG, NULL, NULL),
createIntConfig("port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.port, 6379, INTEGER_CONFIG, NULL, updatePort), /* TCP port. */
createIntConfig("io-threads", NULL, DEBUG_CONFIG | IMMUTABLE_CONFIG, 1, 128, server.io_threads_num, 1, INTEGER_CONFIG, NULL, NULL), /* Single threaded by default */
createIntConfig("prefetch-batch-max-size", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, 128, server.prefetch_batch_max_size, 16, INTEGER_CONFIG, NULL, NULL),
createIntConfig("auto-aof-rewrite-percentage", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.aof_rewrite_perc, 100, INTEGER_CONFIG, NULL, NULL),
createIntConfig("cluster-replica-validity-factor", "cluster-slave-validity-factor", MODIFIABLE_CONFIG, 0, INT_MAX, server.cluster_slave_validity_factor, 10, INTEGER_CONFIG, NULL, NULL), /* Slave max data age factor. */
createIntConfig("list-max-listpack-size", "list-max-ziplist-size", MODIFIABLE_CONFIG, INT_MIN, INT_MAX, server.list_max_listpack_size, -2, INTEGER_CONFIG, NULL, NULL),

View File

@ -105,10 +105,10 @@
/* Test for __builtin_prefetch()
* Supported in LLVM since 2.9: https://releases.llvm.org/2.9/docs/ReleaseNotes.html
* Supported in GCC since 3.1 but we use 4.9 given it's too old: https://gcc.gnu.org/gcc-3.1/changes.html. */
* Supported in GCC since 3.1 but we use 4.8 given it's too old: https://gcc.gnu.org/gcc-3.1/changes.html. */
#if defined(__clang__) && (__clang_major__ > 2 || (__clang_major__ == 2 && __clang_minor__ >= 9))
#define HAS_BUILTIN_PREFETCH 1
#elif defined(__GNUC__) && (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 9))
#elif defined(__GNUC__) && (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8))
#define HAS_BUILTIN_PREFETCH 1
#else
#define HAS_BUILTIN_PREFETCH 0

View File

@ -363,6 +363,24 @@ int getKeySlot(sds key) {
return calculateKeySlot(key);
}
/* Return the slot of the key in the command. IO threads use this function
* to calculate slot to reduce main-thread load */
int getSlotFromCommand(struct redisCommand *cmd, robj **argv, int argc) {
int slot = -1;
if (!cmd || !server.cluster_enabled) return slot;
/* Get the keys from the command */
getKeysResult result = GETKEYS_RESULT_INIT;
int numkeys = getKeysFromCommand(cmd, argv, argc, &result);
if (numkeys > 0) {
/* Get the slot of the first key */
robj *first = argv[result.keys[0].pos];
slot = keyHashSlot(first->ptr, (int)sdslen(first->ptr));
}
getKeysFreeResult(&result);
return slot;
}
/* This is a special version of dbAdd() that is used only when loading
* keys from the RDB file: the key is passed as an SDS string that is
* copied by the function and freed by the caller.

View File

@ -66,7 +66,6 @@ static void _dictShrinkIfNeeded(dict *d);
static void _dictRehashStepIfNeeded(dict *d, uint64_t visitedIdx);
static signed char _dictNextExp(unsigned long size);
static int _dictInit(dict *d, dictType *type);
static dictEntry *dictGetNext(const dictEntry *de);
static dictEntryLink dictGetNextLink(dictEntry *de);
static void dictSetNext(dictEntry *de, dictEntry *next);
static int dictDefaultCompare(dictCmpCache *cache, const void *key1, const void *key2);
@ -499,6 +498,12 @@ int dictAdd(dict *d, void *key, void *val)
return DICT_OK;
}
int dictCompareKeys(dict *d, const void *key1, const void *key2) {
dictCmpCache cache = {0};
keyCmpFunc cmpFunc = dictGetCmpFunc(d);
return cmpFunc(&cache, key1, key2);
}
/* Low level add or find:
* This function adds the entry but instead of setting a value returns the
* dictEntry structure to the user, that will make sure to fill the value
@ -1007,6 +1012,10 @@ double dictIncrDoubleVal(dictEntry *de, double val) {
return de->v.d += val;
}
int dictEntryIsKey(const dictEntry *de) {
return entryIsKey(de);
}
void *dictGetKey(const dictEntry *de) {
/* if entryIsKey() */
if ((uintptr_t)de & ENTRY_PTR_IS_ODD_KEY) return (void *) de;
@ -1044,7 +1053,7 @@ double *dictGetDoubleValPtr(dictEntry *de) {
/* Returns the 'next' field of the entry or NULL if the entry doesn't have a
* 'next' field. */
static dictEntry *dictGetNext(const dictEntry *de) {
dictEntry *dictGetNext(const dictEntry *de) {
if (entryIsKey(de)) return NULL; /* there's no next */
if (entryIsNoValue(de)) return decodeEntryNoValue(de)->next;
return de->next;

View File

@ -181,11 +181,6 @@ typedef struct {
if ((d)->type->keyDestructor) \
(d)->type->keyDestructor((d), dictGetKey(entry))
#define dictCompareKeys(d, key1, key2) \
(((d)->type->keyCompare) ? \
(d)->type->keyCompare((d), key1, key2) : \
(key1) == (key2))
#define dictMetadata(d) (&(d)->metadata)
#define dictMetadataSize(d) ((d)->type->dictMetadataBytes \
? (d)->type->dictMetadataBytes(d) : 0)
@ -234,6 +229,8 @@ dictEntry * dictFind(dict *d, const void *key);
int dictShrinkIfNeeded(dict *d);
int dictExpandIfNeeded(dict *d);
void *dictGetKey(const dictEntry *de);
int dictEntryIsKey(const dictEntry *de);
int dictCompareKeys(dict *d, const void *key1, const void *key2);
size_t dictMemUsage(const dict *d);
size_t dictEntryMemUsage(int noValueDict);
dictIterator *dictGetIterator(dict *d);
@ -242,6 +239,7 @@ void dictInitIterator(dictIterator *iter, dict *d);
void dictInitSafeIterator(dictIterator *iter, dict *d);
void dictResetIterator(dictIterator *iter);
dictEntry *dictNext(dictIterator *iter);
dictEntry *dictGetNext(const dictEntry *de);
void dictReleaseIterator(dictIterator *iter);
dictEntry *dictGetRandomKey(dict *d);
dictEntry *dictGetFairRandomKey(dict *d);

View File

@ -44,9 +44,9 @@
/* Everything below this line is automatically generated by
* generate-fmtargs.py. Do not manually edit. */
#define ARG_N(_1, _2, _3, _4, _5, _6, _7, _8, _9, _10, _11, _12, _13, _14, _15, _16, _17, _18, _19, _20, _21, _22, _23, _24, _25, _26, _27, _28, _29, _30, _31, _32, _33, _34, _35, _36, _37, _38, _39, _40, _41, _42, _43, _44, _45, _46, _47, _48, _49, _50, _51, _52, _53, _54, _55, _56, _57, _58, _59, _60, _61, _62, _63, _64, _65, _66, _67, _68, _69, _70, _71, _72, _73, _74, _75, _76, _77, _78, _79, _80, _81, _82, _83, _84, _85, _86, _87, _88, _89, _90, _91, _92, _93, _94, _95, _96, _97, _98, _99, _100, _101, _102, _103, _104, _105, _106, _107, _108, _109, _110, _111, _112, _113, _114, _115, _116, _117, _118, _119, _120, N, ...) N
#define ARG_N(_1, _2, _3, _4, _5, _6, _7, _8, _9, _10, _11, _12, _13, _14, _15, _16, _17, _18, _19, _20, _21, _22, _23, _24, _25, _26, _27, _28, _29, _30, _31, _32, _33, _34, _35, _36, _37, _38, _39, _40, _41, _42, _43, _44, _45, _46, _47, _48, _49, _50, _51, _52, _53, _54, _55, _56, _57, _58, _59, _60, _61, _62, _63, _64, _65, _66, _67, _68, _69, _70, _71, _72, _73, _74, _75, _76, _77, _78, _79, _80, _81, _82, _83, _84, _85, _86, _87, _88, _89, _90, _91, _92, _93, _94, _95, _96, _97, _98, _99, _100, _101, _102, _103, _104, _105, _106, _107, _108, _109, _110, _111, _112, _113, _114, _115, _116, _117, _118, _119, _120, _121, _122, _123, _124, _125, _126, _127, _128, _129, _130, _131, _132, _133, _134, _135, _136, _137, _138, _139, _140, _141, _142, _143, _144, _145, _146, _147, _148, _149, _150, _151, _152, _153, _154, _155, _156, _157, _158, _159, _160, N, ...) N
#define RSEQ_N() 120, 119, 118, 117, 116, 115, 114, 113, 112, 111, 110, 109, 108, 107, 106, 105, 104, 103, 102, 101, 100, 99, 98, 97, 96, 95, 94, 93, 92, 91, 90, 89, 88, 87, 86, 85, 84, 83, 82, 81, 80, 79, 78, 77, 76, 75, 74, 73, 72, 71, 70, 69, 68, 67, 66, 65, 64, 63, 62, 61, 60, 59, 58, 57, 56, 55, 54, 53, 52, 51, 50, 49, 48, 47, 46, 45, 44, 43, 42, 41, 40, 39, 38, 37, 36, 35, 34, 33, 32, 31, 30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0
#define RSEQ_N() 160, 159, 158, 157, 156, 155, 154, 153, 152, 151, 150, 149, 148, 147, 146, 145, 144, 143, 142, 141, 140, 139, 138, 137, 136, 135, 134, 133, 132, 131, 130, 129, 128, 127, 126, 125, 124, 123, 122, 121, 120, 119, 118, 117, 116, 115, 114, 113, 112, 111, 110, 109, 108, 107, 106, 105, 104, 103, 102, 101, 100, 99, 98, 97, 96, 95, 94, 93, 92, 91, 90, 89, 88, 87, 86, 85, 84, 83, 82, 81, 80, 79, 78, 77, 76, 75, 74, 73, 72, 71, 70, 69, 68, 67, 66, 65, 64, 63, 62, 61, 60, 59, 58, 57, 56, 55, 54, 53, 52, 51, 50, 49, 48, 47, 46, 45, 44, 43, 42, 41, 40, 39, 38, 37, 36, 35, 34, 33, 32, 31, 30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0
#define COMPACT_FMT_2(fmt, value) fmt
#define COMPACT_FMT_4(fmt, value, ...) fmt COMPACT_FMT_2(__VA_ARGS__)
@ -108,6 +108,26 @@
#define COMPACT_FMT_116(fmt, value, ...) fmt COMPACT_FMT_114(__VA_ARGS__)
#define COMPACT_FMT_118(fmt, value, ...) fmt COMPACT_FMT_116(__VA_ARGS__)
#define COMPACT_FMT_120(fmt, value, ...) fmt COMPACT_FMT_118(__VA_ARGS__)
#define COMPACT_FMT_122(fmt, value, ...) fmt COMPACT_FMT_120(__VA_ARGS__)
#define COMPACT_FMT_124(fmt, value, ...) fmt COMPACT_FMT_122(__VA_ARGS__)
#define COMPACT_FMT_126(fmt, value, ...) fmt COMPACT_FMT_124(__VA_ARGS__)
#define COMPACT_FMT_128(fmt, value, ...) fmt COMPACT_FMT_126(__VA_ARGS__)
#define COMPACT_FMT_130(fmt, value, ...) fmt COMPACT_FMT_128(__VA_ARGS__)
#define COMPACT_FMT_132(fmt, value, ...) fmt COMPACT_FMT_130(__VA_ARGS__)
#define COMPACT_FMT_134(fmt, value, ...) fmt COMPACT_FMT_132(__VA_ARGS__)
#define COMPACT_FMT_136(fmt, value, ...) fmt COMPACT_FMT_134(__VA_ARGS__)
#define COMPACT_FMT_138(fmt, value, ...) fmt COMPACT_FMT_136(__VA_ARGS__)
#define COMPACT_FMT_140(fmt, value, ...) fmt COMPACT_FMT_138(__VA_ARGS__)
#define COMPACT_FMT_142(fmt, value, ...) fmt COMPACT_FMT_140(__VA_ARGS__)
#define COMPACT_FMT_144(fmt, value, ...) fmt COMPACT_FMT_142(__VA_ARGS__)
#define COMPACT_FMT_146(fmt, value, ...) fmt COMPACT_FMT_144(__VA_ARGS__)
#define COMPACT_FMT_148(fmt, value, ...) fmt COMPACT_FMT_146(__VA_ARGS__)
#define COMPACT_FMT_150(fmt, value, ...) fmt COMPACT_FMT_148(__VA_ARGS__)
#define COMPACT_FMT_152(fmt, value, ...) fmt COMPACT_FMT_150(__VA_ARGS__)
#define COMPACT_FMT_154(fmt, value, ...) fmt COMPACT_FMT_152(__VA_ARGS__)
#define COMPACT_FMT_156(fmt, value, ...) fmt COMPACT_FMT_154(__VA_ARGS__)
#define COMPACT_FMT_158(fmt, value, ...) fmt COMPACT_FMT_156(__VA_ARGS__)
#define COMPACT_FMT_160(fmt, value, ...) fmt COMPACT_FMT_158(__VA_ARGS__)
#define COMPACT_VALUES_2(fmt, value) value
#define COMPACT_VALUES_4(fmt, value, ...) value, COMPACT_VALUES_2(__VA_ARGS__)
@ -169,5 +189,25 @@
#define COMPACT_VALUES_116(fmt, value, ...) value, COMPACT_VALUES_114(__VA_ARGS__)
#define COMPACT_VALUES_118(fmt, value, ...) value, COMPACT_VALUES_116(__VA_ARGS__)
#define COMPACT_VALUES_120(fmt, value, ...) value, COMPACT_VALUES_118(__VA_ARGS__)
#define COMPACT_VALUES_122(fmt, value, ...) value, COMPACT_VALUES_120(__VA_ARGS__)
#define COMPACT_VALUES_124(fmt, value, ...) value, COMPACT_VALUES_122(__VA_ARGS__)
#define COMPACT_VALUES_126(fmt, value, ...) value, COMPACT_VALUES_124(__VA_ARGS__)
#define COMPACT_VALUES_128(fmt, value, ...) value, COMPACT_VALUES_126(__VA_ARGS__)
#define COMPACT_VALUES_130(fmt, value, ...) value, COMPACT_VALUES_128(__VA_ARGS__)
#define COMPACT_VALUES_132(fmt, value, ...) value, COMPACT_VALUES_130(__VA_ARGS__)
#define COMPACT_VALUES_134(fmt, value, ...) value, COMPACT_VALUES_132(__VA_ARGS__)
#define COMPACT_VALUES_136(fmt, value, ...) value, COMPACT_VALUES_134(__VA_ARGS__)
#define COMPACT_VALUES_138(fmt, value, ...) value, COMPACT_VALUES_136(__VA_ARGS__)
#define COMPACT_VALUES_140(fmt, value, ...) value, COMPACT_VALUES_138(__VA_ARGS__)
#define COMPACT_VALUES_142(fmt, value, ...) value, COMPACT_VALUES_140(__VA_ARGS__)
#define COMPACT_VALUES_144(fmt, value, ...) value, COMPACT_VALUES_142(__VA_ARGS__)
#define COMPACT_VALUES_146(fmt, value, ...) value, COMPACT_VALUES_144(__VA_ARGS__)
#define COMPACT_VALUES_148(fmt, value, ...) value, COMPACT_VALUES_146(__VA_ARGS__)
#define COMPACT_VALUES_150(fmt, value, ...) value, COMPACT_VALUES_148(__VA_ARGS__)
#define COMPACT_VALUES_152(fmt, value, ...) value, COMPACT_VALUES_150(__VA_ARGS__)
#define COMPACT_VALUES_154(fmt, value, ...) value, COMPACT_VALUES_152(__VA_ARGS__)
#define COMPACT_VALUES_156(fmt, value, ...) value, COMPACT_VALUES_154(__VA_ARGS__)
#define COMPACT_VALUES_158(fmt, value, ...) value, COMPACT_VALUES_156(__VA_ARGS__)
#define COMPACT_VALUES_160(fmt, value, ...) value, COMPACT_VALUES_158(__VA_ARGS__)
#endif

View File

@ -343,6 +343,29 @@ int sendPendingClientsToIOThreads(void) {
return processed;
}
/* Prefetch the commands from the IO thread. The return value is the number
* of clients that have been prefetched. */
int prefetchIOThreadCommands(IOThread *t) {
int len = listLength(mainThreadProcessingClients[t->id]);
int to_prefetch = determinePrefetchCount(len);
if (to_prefetch == 0) return 0;
int clients = 0;
listIter li;
listNode *ln;
listRewind(mainThreadProcessingClients[t->id], &li);
while((ln = listNext(&li)) && clients++ < to_prefetch) {
client *c = listNodeValue(ln);
/* A single command may contain multiple keys. If the batch is full,
* we stop adding clients to it. */
if (addCommandToBatch(c) == C_ERR) break;
}
/* Prefetch the commands in the batch. */
prefetchCommands();
return clients;
}
extern int ProcessingEventsWhileBlocked;
/* Send the pending clients to the IO thread if the number of pending clients
@ -391,8 +414,19 @@ int processClientsFromIOThread(IOThread *t) {
size_t processed = listLength(mainThreadProcessingClients[t->id]);
if (processed == 0) return 0;
int prefetch_clients = 0;
/* We may call processClientsFromIOThread reentrantly, so we need to
* reset the prefetching batch, besides, users may change the config
* of prefetch batch size, so we need to reset the prefetching batch. */
resetCommandsBatch();
listNode *node = NULL;
while (listLength(mainThreadProcessingClients[t->id])) {
/* Prefetch the commands if no clients in the batch. */
if (prefetch_clients <= 0) prefetch_clients = prefetchIOThreadCommands(t);
/* Reset the prefetching batch if we have processed all clients. */
if (--prefetch_clients <= 0) resetCommandsBatch();
/* Each time we pop up only the first client to process to guarantee
* reentrancy safety. */
if (node) zfree(node);
@ -651,6 +685,8 @@ void initThreadedIO(void) {
exit(1);
}
prefetchCommandsBatchInit();
/* Spawn and initialize the I/O threads. */
for (int i = 1; i < server.io_threads_num; i++) {
IOThread *t = &IOThreads[i];

View File

@ -83,7 +83,7 @@ typedef struct {
/**********************************/
/* Get the dictionary pointer based on dict-index. */
static dict *kvstoreGetDict(kvstore *kvs, int didx) {
dict *kvstoreGetDict(kvstore *kvs, int didx) {
return kvs->dicts[didx];
}

View File

@ -97,6 +97,7 @@ dictEntry *kvstoreDictAddRaw(kvstore *kvs, int didx, void *key, dictEntry **exis
dictEntryLink kvstoreDictTwoPhaseUnlinkFind(kvstore *kvs, int didx, const void *key, int *table_index);
void kvstoreDictTwoPhaseUnlinkFree(kvstore *kvs, int didx, dictEntryLink plink, int table_index);
int kvstoreDictDelete(kvstore *kvs, int didx, const void *key);
dict *kvstoreGetDict(kvstore *kvs, int didx);
kvstoreDictMetadata *kvstoreGetDictMetadata(kvstore *kvs, int didx);
kvstoreMetadata *kvstoreGetMetadata(kvstore *kvs);

399
src/memory_prefetch.c Normal file
View File

@ -0,0 +1,399 @@
/*
* This file utilizes prefetching keys and data for multiple commands in a batch,
* to improve performance by amortizing memory access costs across multiple operations.
*
* Copyright (c) 2025-Present, Redis Ltd. and contributors.
* All rights reserved.
*
* Copyright (c) 2024-present, Valkey contributors.
* All rights reserved.
*
* Licensed under your choice of (a) the Redis Source Available License 2.0
* (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
* GNU Affero General Public License v3 (AGPLv3).
*
* Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
*/
#include "memory_prefetch.h"
#include "server.h"
#include "dict.h"
typedef enum { HT_IDX_FIRST = 0, HT_IDX_SECOND = 1, HT_IDX_INVALID = -1 } HashTableIndex;
typedef enum {
PREFETCH_BUCKET, /* Initial state, determines which hash table to use and prefetch the table's bucket */
PREFETCH_ENTRY, /* prefetch entries associated with the given key's hash */
PREFETCH_KVOBJ, /* prefetch the kv object of the entry found in the previous step */
PREFETCH_VALDATA, /* prefetch the value data of the kv object found in the previous step */
PREFETCH_DONE /* Indicates that prefetching for this key is complete */
} PrefetchState;
/************************************ State machine diagram for the prefetch operation. ********************************
start
PREFETCH_BUCKET
no more tables -> done
| bucket|found |
|
entry not found - goto next table
PREFETCH_ENTRY |
| Entryfound
|
|
| PREFETCH_KVOBJ |
kvobj not found - goto next entry | |
PREFETCH_VALDATA
|
-
PREFETCH_DONE
**********************************************************************************************************************/
typedef void *(*GetValueDataFunc)(const void *val);
typedef struct KeyPrefetchInfo {
PrefetchState state; /* Current state of the prefetch operation */
HashTableIndex ht_idx; /* Index of the current hash table (0 or 1 for rehashing) */
uint64_t bucket_idx; /* Index of the bucket in the current hash table */
uint64_t key_hash; /* Hash value of the key being prefetched */
dictEntry *current_entry; /* Pointer to the current entry being processed */
kvobj *current_kv; /* Pointer to the kv object being prefetched */
} KeyPrefetchInfo;
/* PrefetchCommandsBatch structure holds the state of the current batch of client commands being processed. */
typedef struct PrefetchCommandsBatch {
size_t cur_idx; /* Index of the current key being processed */
size_t key_count; /* Number of keys in the current batch */
size_t client_count; /* Number of clients in the current batch */
size_t max_prefetch_size; /* Maximum number of keys to prefetch in a batch */
void **keys; /* Array of keys to prefetch in the current batch */
client **clients; /* Array of clients in the current batch */
dict **keys_dicts; /* Main dict for each key */
dict **current_dicts; /* Points to dict to prefetch from */
KeyPrefetchInfo *prefetch_info; /* Prefetch info for each key */
GetValueDataFunc get_value_data_func; /* Function to get the value data */
} PrefetchCommandsBatch;
static PrefetchCommandsBatch *batch = NULL;
void freePrefetchCommandsBatch(void) {
if (batch == NULL) {
return;
}
zfree(batch->clients);
zfree(batch->keys);
zfree(batch->keys_dicts);
zfree(batch->prefetch_info);
zfree(batch);
batch = NULL;
}
void prefetchCommandsBatchInit(void) {
serverAssert(!batch);
/* To avoid prefetching small batches, we set the max size to twice
* the configured size, so if not exceeding twice the limit, we can
* prefetch all of it. See also `determinePrefetchCount` */
size_t max_prefetch_size = server.prefetch_batch_max_size * 2;
if (max_prefetch_size == 0) {
return;
}
batch = zcalloc(sizeof(PrefetchCommandsBatch));
batch->max_prefetch_size = max_prefetch_size;
batch->clients = zcalloc(max_prefetch_size * sizeof(client *));
batch->keys = zcalloc(max_prefetch_size * sizeof(void *));
batch->keys_dicts = zcalloc(max_prefetch_size * sizeof(dict *));
batch->prefetch_info = zcalloc(max_prefetch_size * sizeof(KeyPrefetchInfo));
}
void onMaxBatchSizeChange(void) {
if (batch && batch->client_count > 0) {
/* We need to process the current batch before updating the size */
return;
}
freePrefetchCommandsBatch();
prefetchCommandsBatchInit();
}
/* Prefetch the given pointer and move to the next key in the batch. */
static inline void prefetchAndMoveToNextKey(void *addr) {
redis_prefetch_read(addr);
/* While the prefetch is in progress, we can continue to the next key */
batch->cur_idx = (batch->cur_idx + 1) % batch->key_count;
}
static inline void markKeyAsdone(KeyPrefetchInfo *info) {
info->state = PREFETCH_DONE;
server.stat_total_prefetch_entries++;
}
/* Returns the next KeyPrefetchInfo structure that needs to be processed. */
static KeyPrefetchInfo *getNextPrefetchInfo(void) {
size_t start_idx = batch->cur_idx;
do {
KeyPrefetchInfo *info = &batch->prefetch_info[batch->cur_idx];
if (info->state != PREFETCH_DONE) return info;
batch->cur_idx = (batch->cur_idx + 1) % batch->key_count;
} while (batch->cur_idx != start_idx);
return NULL;
}
static void initBatchInfo(dict **dicts, GetValueDataFunc func) {
batch->current_dicts = dicts;
batch->get_value_data_func = func;
/* Initialize the prefetch info */
for (size_t i = 0; i < batch->key_count; i++) {
KeyPrefetchInfo *info = &batch->prefetch_info[i];
if (!batch->current_dicts[i] || dictSize(batch->current_dicts[i]) == 0) {
info->state = PREFETCH_DONE;
continue;
}
info->ht_idx = HT_IDX_INVALID;
info->current_entry = NULL;
info->current_kv = NULL;
info->state = PREFETCH_BUCKET;
info->key_hash = dictGetHash(batch->current_dicts[i], batch->keys[i]);
}
}
/* Prefetch the bucket of the next hash table index.
* If no tables are left, move to the PREFETCH_DONE state. */
static void prefetchBucket(KeyPrefetchInfo *info) {
size_t i = batch->cur_idx;
/* Determine which hash table to use */
if (info->ht_idx == HT_IDX_INVALID) {
info->ht_idx = HT_IDX_FIRST;
} else if (info->ht_idx == HT_IDX_FIRST && dictIsRehashing(batch->current_dicts[i])) {
info->ht_idx = HT_IDX_SECOND;
} else {
/* No more tables left - mark as done. */
markKeyAsdone(info);
return;
}
/* Prefetch the bucket */
info->bucket_idx = info->key_hash & DICTHT_SIZE_MASK(batch->current_dicts[i]->ht_size_exp[info->ht_idx]);
prefetchAndMoveToNextKey(&batch->current_dicts[i]->ht_table[info->ht_idx][info->bucket_idx]);
info->current_entry = NULL;
info->state = PREFETCH_ENTRY;
}
/* Prefetch the entry in the bucket and move to the PREFETCH_KVOBJ state.
* If no more entries in the bucket, move to the PREFETCH_BUCKET state to look at the next table. */
static void prefetchEntry(KeyPrefetchInfo *info) {
size_t i = batch->cur_idx;
if (info->current_entry) {
/* We already found an entry in the bucket - move to the next entry */
info->current_entry = dictGetNext(info->current_entry);
} else {
/* Go to the first entry in the bucket */
info->current_entry = batch->current_dicts[i]->ht_table[info->ht_idx][info->bucket_idx];
}
if (info->current_entry) {
prefetchAndMoveToNextKey(info->current_entry);
info->current_kv = NULL;
info->state = PREFETCH_KVOBJ;
} else {
/* No entry found in the bucket - try the bucket in the next table */
info->state = PREFETCH_BUCKET;
}
}
/* Prefetch the kv object in the dict entry, and to the PREFETCH_VALDATA state. */
static inline void prefetchKVOject(KeyPrefetchInfo *info) {
kvobj *kv = dictGetKey(info->current_entry);
int is_kv = dictEntryIsKey(info->current_entry);
info->current_kv = kv;
info->state = PREFETCH_VALDATA;
/* If the entry is a pointer of kv object, we don't need to prefetch it */
if (!is_kv) prefetchAndMoveToNextKey(kv);
}
/* Prefetch the value data of the kv object found in dict entry. */
static void prefetchValueData(KeyPrefetchInfo *info) {
size_t i = batch->cur_idx;
kvobj *kv = info->current_kv;
/* 1. If this is the last element, we assume a hit and don't compare the keys
* 2. This kv object is the target of the lookup. */
if ((!dictGetNext(info->current_entry) && !dictIsRehashing(batch->current_dicts[i])) ||
dictCompareKeys(batch->current_dicts[i], batch->keys[i], kv))
{
if (batch->get_value_data_func) {
void *value_data = batch->get_value_data_func(kv);
if (value_data) prefetchAndMoveToNextKey(value_data);
}
markKeyAsdone(info);
} else {
/* Not found in the current entry, move to the next entry */
info->state = PREFETCH_ENTRY;
}
}
/* Prefetch dictionary data for an array of keys.
*
* This function takes an array of dictionaries and keys, attempting to bring
* data closer to the L1 cache that might be needed for dictionary operations
* on those keys.
*
* The dictFind algorithm:
* 1. Evaluate the hash of the key
* 2. Access the index in the first table
* 3. Walk the entries linked list until the key is found
* If the key hasn't been found and the dictionary is in the middle of rehashing,
* access the index on the second table and repeat step 3
*
* dictPrefetch executes the same algorithm as dictFind, but one step at a time
* for each key. Instead of waiting for data to be read from memory, it prefetches
* the data and then moves on to execute the next prefetch for another key.
*
* dicts - An array of dictionaries to prefetch data from.
* get_val_data_func - A callback function that dictPrefetch can invoke
* to bring the key's value data closer to the L1 cache as well.
*/
static void dictPrefetch(dict **dicts, GetValueDataFunc get_val_data_func) {
initBatchInfo(dicts, get_val_data_func);
KeyPrefetchInfo *info;
while ((info = getNextPrefetchInfo())) {
switch (info->state) {
case PREFETCH_BUCKET: prefetchBucket(info); break;
case PREFETCH_ENTRY: prefetchEntry(info); break;
case PREFETCH_KVOBJ: prefetchKVOject(info); break;
case PREFETCH_VALDATA: prefetchValueData(info); break;
default: serverPanic("Unknown prefetch state %d", info->state);
}
}
}
/* Helper function to get the value pointer of a kv object. */
static void *getObjectValuePtr(const void *value) {
kvobj *kv = (kvobj *)value;
return (kv->type == OBJ_STRING && kv->encoding == OBJ_ENCODING_RAW) ? kv->ptr : NULL;
}
void resetCommandsBatch(void) {
if (batch == NULL) {
/* Handle the case where prefetching becomes enabled from disabled. */
if (server.prefetch_batch_max_size) prefetchCommandsBatchInit();
return;
}
batch->cur_idx = 0;
batch->key_count = 0;
batch->client_count = 0;
/* Handle the case where the max prefetch size has been changed. */
if (batch->max_prefetch_size != (size_t)server.prefetch_batch_max_size * 2) {
onMaxBatchSizeChange();
}
}
/* Prefetching in very small batches tends to be ineffective because the technique
* relies on a small gaptypically a few CPU cyclesbetween issuing the prefetch
* and performing the actual memory access. If the batch is too small, this delay
* cannot be effectively inserted, and the prefetching yields little to no benefit.
*
* To avoid wasting effort, when the remaining data is small (less than twice the
* maximum batch size), we simply prefetch all of it at once. Otherwise, we only
* prefetch a limited portion, capped at the configured maximum. */
int determinePrefetchCount(int len) {
if (!batch) return 0;
/* The batch max size is double of the configured size. */
int config_size = batch->max_prefetch_size / 2;
return len < server.prefetch_batch_max_size ? len : config_size;
}
/* Prefetch command-related data:
* 1. Prefetch the command arguments allocated by the I/O thread to bring them
* closer to the L1 cache.
* 2. Prefetch the keys and values for all commands in the current batch from
* the main dictionaries. */
void prefetchCommands(void) {
if (!batch) return;
/* Prefetch argv's for all clients */
for (size_t i = 0; i < batch->client_count; i++) {
client *c = batch->clients[i];
if (!c || c->argc <= 1) continue;
/* Skip prefetching first argv (cmd name) it was already looked up by
* the I/O thread, and the main thread will not touch argv[0]. */
for (int j = 1; j < c->argc; j++) {
redis_prefetch_read(c->argv[j]);
}
}
/* Prefetch the argv->ptr if required */
for (size_t i = 0; i < batch->client_count; i++) {
client *c = batch->clients[i];
if (!c || c->argc <= 1) continue;
for (int j = 1; j < c->argc; j++) {
if (c->argv[j]->encoding == OBJ_ENCODING_RAW) {
redis_prefetch_read(c->argv[j]->ptr);
}
}
}
/* Get the keys ptrs - we do it here after the key obj was prefetched. */
for (size_t i = 0; i < batch->key_count; i++) {
batch->keys[i] = ((robj *)batch->keys[i])->ptr;
}
/* Prefetch dict keys for all commands.
* Prefetching is beneficial only if there are more than one key. */
if (batch->key_count > 1) {
server.stat_total_prefetch_batches++;
/* Prefetch keys from the main dict */
dictPrefetch(batch->keys_dicts, getObjectValuePtr);
}
}
/* Adds the client's command to the current batch.
*
* Returns C_OK if the command was added successfully, C_ERR otherwise. */
int addCommandToBatch(client *c) {
if (unlikely(!batch)) return C_ERR;
/* If the batch is full, process it.
* We also check the client count to handle cases where
* no keys exist for the clients' commands. */
if (batch->client_count == batch->max_prefetch_size ||
batch->key_count == batch->max_prefetch_size)
{
return C_ERR;
}
batch->clients[batch->client_count++] = c;
if (likely(c->iolookedcmd)) {
/* Get command's keys positions */
getKeysResult result = GETKEYS_RESULT_INIT;
int num_keys = getKeysFromCommand(c->iolookedcmd, c->argv, c->argc, &result);
for (int i = 0; i < num_keys && batch->key_count < batch->max_prefetch_size; i++) {
batch->keys[batch->key_count] = c->argv[result.keys[i].pos];
batch->keys_dicts[batch->key_count] =
kvstoreGetDict(c->db->keys, c->slot > 0 ? c->slot : 0);
batch->key_count++;
}
getKeysFreeResult(&result);
}
return C_OK;
}

26
src/memory_prefetch.h Normal file
View File

@ -0,0 +1,26 @@
/*
* Copyright (c) 2025-Present, Redis Ltd.
* All rights reserved.
*
* Copyright (c) 2024-present, Valkey contributors.
* All rights reserved.
*
* Licensed under your choice of (a) the Redis Source Available License 2.0
* (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
* GNU Affero General Public License v3 (AGPLv3).
*
* Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
*/
#ifndef MEMORY_PREFETCH_H
#define MEMORY_PREFETCH_H
struct client;
void prefetchCommandsBatchInit(void);
int determinePrefetchCount(int len);
int addCommandToBatch(struct client *c);
void resetCommandsBatch(void);
void prefetchCommands(void);
#endif /* MEMORY_PREFETCH_H */

View File

@ -2877,6 +2877,7 @@ int processInputBuffer(client *c) {
if (c->running_tid != IOTHREAD_MAIN_THREAD_ID) {
c->io_flags |= CLIENT_IO_PENDING_COMMAND;
c->iolookedcmd = lookupCommand(c->argv, c->argc);
c->slot = getSlotFromCommand(c->iolookedcmd, c->argv, c->argc);
enqueuePendingClientsToMainThread(c, 0);
break;
}

View File

@ -2745,6 +2745,8 @@ void resetServerStats(void) {
server.stat_reply_buffer_shrinks = 0;
server.stat_reply_buffer_expands = 0;
server.stat_cluster_incompatible_ops = 0;
server.stat_total_prefetch_batches = 0;
server.stat_total_prefetch_entries = 0;
memset(server.duration_stats, 0, sizeof(durationStats) * EL_DURATION_TYPE_NUM);
server.el_cmd_cnt_max = 0;
lazyfreeResetStats();
@ -6223,6 +6225,8 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
"total_writes_processed:%lld\r\n", stat_total_writes_processed,
"io_threaded_reads_processed:%lld\r\n", stat_io_reads_processed,
"io_threaded_writes_processed:%lld\r\n", stat_io_writes_processed,
"io_threaded_total_prefetch_batches:%lld\r\n", server.stat_total_prefetch_batches,
"io_threaded_total_prefetch_entries:%lld\r\n", server.stat_total_prefetch_entries,
"client_query_buffer_limit_disconnections:%lld\r\n", stat_client_qbuf_limit_disconnections,
"client_output_buffer_limit_disconnections:%lld\r\n", server.stat_client_outbuf_limit_disconnections,
"reply_buffer_shrinks:%lld\r\n", server.stat_reply_buffer_shrinks,

View File

@ -63,6 +63,7 @@ typedef long long ustime_t; /* microsecond time type. */
#include "rax.h" /* Radix tree */
#include "connection.h" /* Connection abstraction */
#include "eventnotifier.h" /* Event notification */
#include "memory_prefetch.h"
#define REDISMODULE_CORE 1
typedef struct redisObject robj;
@ -1838,6 +1839,7 @@ struct redisServer {
int io_threads_clients_num[IO_THREADS_MAX_NUM]; /* Number of clients assigned to each IO thread. */
int io_threads_do_reads; /* Read and parse from IO threads? */
int io_threads_active; /* Is IO threads currently active? */
int prefetch_batch_max_size;/* Maximum number of keys to prefetch in a single batch */
long long events_processed_while_blocked; /* processEventsWhileBlocked() */
int enable_protected_configs; /* Enable the modification of protected configs, see PROTECTED_ACTION_ALLOWED_* */
int enable_debug_cmd; /* Enable DEBUG commands, see PROTECTED_ACTION_ALLOWED_* */
@ -1913,6 +1915,8 @@ struct redisServer {
redisAtomic long long stat_client_qbuf_limit_disconnections; /* Total number of clients reached query buf length limit */
long long stat_client_outbuf_limit_disconnections; /* Total number of clients reached output buf length limit */
long long stat_cluster_incompatible_ops; /* Number of operations that are incompatible with cluster mode */
long long stat_total_prefetch_entries; /* Total number of prefetched dict entries */
long long stat_total_prefetch_batches; /* Total number of prefetched batches */
/* The following two are used to track instantaneous metrics, like
* number of operations per second, network traffic. */
struct {
@ -3695,6 +3699,7 @@ void freeReplicationBacklogRefMemAsync(list *blocks, rax *index);
int getKeysFromCommandWithSpecs(struct redisCommand *cmd, robj **argv, int argc, int search_flags, getKeysResult *result);
keyReference *getKeysPrepareResult(getKeysResult *result, int numkeys);
int getKeysFromCommand(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);
int getSlotFromCommand(struct redisCommand *cmd, robj **argv, int argc);
int doesCommandHaveKeys(struct redisCommand *cmd);
int getChannelsFromCommand(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);
int doesCommandHaveChannelsWithFlags(struct redisCommand *cmd, int flags);

View File

@ -1,3 +1,17 @@
#
# Copyright (c) 2009-Present, Redis Ltd.
# All rights reserved.
#
# Copyright (c) 2025-present, Valkey contributors.
# All rights reserved.
#
# Licensed under your choice of (a) the Redis Source Available License 2.0
# (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
# GNU Affero General Public License v3 (AGPLv3).
#
# Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
#
source tests/support/cli.tcl
test {CONFIG SET port number} {
@ -170,3 +184,143 @@ start_server {config "minimal.conf" tags {"external:skip"}} {
}
}
}
start_server {config "minimal.conf" tags {"external:skip"} overrides {enable-debug-command {yes} io-threads 2}} {
set server_pid [s process_id]
# Since each thread may perform memory prefetch independently, this test is
# only run when the number of IO threads is 2 to ensure deterministic results.
if {[r config get io-threads] eq "io-threads 2"} {
test {prefetch works as expected when killing a client from the middle of prefetch commands batch} {
# Create 16 (prefetch batch size) +1 clients
for {set i 0} {$i < 16} {incr i} {
set rd$i [redis_deferring_client]
}
# set a key that will be later be prefetch
r set a 0
# Get the client ID of rd4
$rd4 client id
set rd4_id [$rd4 read]
# Create a batch of commands by suspending the server for a while
# before responding to the first command
pause_process $server_pid
# The first client will kill the fourth client
$rd0 client kill id $rd4_id
# Send set commands for all clients except the first
for {set i 1} {$i < 16} {incr i} {
[set rd$i] set a $i
[set rd$i] flush
}
# Resume the server
resume_process $server_pid
# Read the results
assert_equal {1} [$rd0 read]
catch {$rd4 read} err
assert_match {I/O error reading reply} $err
# verify the prefetch stats are as expected
set info [r info stats]
set prefetch_entries [getInfoProperty $info io_threaded_total_prefetch_entries]
assert_range $prefetch_entries 2 15; # With slower machines, the number of prefetch entries can be lower
set prefetch_batches [getInfoProperty $info io_threaded_total_prefetch_batches]
assert_range $prefetch_batches 1 7; # With slower machines, the number of batches can be higher
# Verify the final state
$rd15 get a
assert_equal {OK} [$rd15 read]
assert_equal {15} [$rd15 read]
}
test {prefetch works as expected when changing the batch size while executing the commands batch} {
# Create 16 (default prefetch batch size) clients
for {set i 0} {$i < 16} {incr i} {
set rd$i [redis_deferring_client]
}
# Create a batch of commands by suspending the server for a while
# before responding to the first command
pause_process $server_pid
# Send set commands for all clients the 5th client will change the prefetch batch size
for {set i 0} {$i < 16} {incr i} {
if {$i == 4} {
[set rd$i] config set prefetch-batch-max-size 1
}
[set rd$i] set a $i
[set rd$i] flush
}
# Resume the server
resume_process $server_pid
# Read the results
for {set i 0} {$i < 16} {incr i} {
assert_equal {OK} [[set rd$i] read]
[set rd$i] close
}
# assert the configured prefetch batch size was changed
assert {[r config get prefetch-batch-max-size] eq "prefetch-batch-max-size 1"}
}
proc do_prefetch_batch {server_pid batch_size} {
# Create clients
for {set i 0} {$i < $batch_size} {incr i} {
set rd$i [redis_deferring_client]
}
# Suspend the server to batch the commands
pause_process $server_pid
# Send commands from all clients
for {set i 0} {$i < $batch_size} {incr i} {
[set rd$i] set a $i
[set rd$i] flush
}
# Resume the server to process the batch
resume_process $server_pid
# Verify responses
for {set i 0} {$i < $batch_size} {incr i} {
assert_equal {OK} [[set rd$i] read]
[set rd$i] close
}
}
test {no prefetch when the batch size is set to 0} {
# set the batch size to 0
r config set prefetch-batch-max-size 0
# save the current value of prefetch entries
set info [r info stats]
set prefetch_entries [getInfoProperty $info io_threaded_total_prefetch_entries]
do_prefetch_batch $server_pid 16
# assert the prefetch entries did not change
set info [r info stats]
set new_prefetch_entries [getInfoProperty $info io_threaded_total_prefetch_entries]
assert_equal $prefetch_entries $new_prefetch_entries
}
test {Prefetch can resume working when the configuration option is set to a non-zero value} {
# save the current value of prefetch entries
set info [r info stats]
set prefetch_entries [getInfoProperty $info io_threaded_total_prefetch_entries]
# set the batch size to 0
r config set prefetch-batch-max-size 16
do_prefetch_batch $server_pid 16
# assert the prefetch entries did not change
set info [r info stats]
set new_prefetch_entries [getInfoProperty $info io_threaded_total_prefetch_entries]
# With slower machines, the number of prefetch entries can be lower
assert_range $new_prefetch_entries [expr {$prefetch_entries + 2}] [expr {$prefetch_entries + 16}]
}
}
}

View File

@ -1,7 +1,7 @@
#!/usr/bin/env python3
# Outputs the generated part of src/fmtargs.h
MAX_ARGS = 120
MAX_ARGS = 160
import os
print("/* Everything below this line is automatically generated by")