mirror of https://github.com/redis/redis.git
				
				
				
			Modules: implement RM_Replicate() from async callbacks.
This commit is contained in:
		
							parent
							
								
									c549513acd
								
							
						
					
					
						commit
						e938bbc543
					
				
							
								
								
									
										33
									
								
								src/module.c
								
								
								
								
							
							
						
						
									
										33
									
								
								src/module.c
								
								
								
								
							| 
						 | 
				
			
			@ -147,10 +147,14 @@ struct RedisModuleCtx {
 | 
			
		|||
    int keys_count;
 | 
			
		||||
 | 
			
		||||
    struct RedisModulePoolAllocBlock *pa_head;
 | 
			
		||||
    redisOpArray saved_oparray;    /* When propagating commands in a callback
 | 
			
		||||
                                      we reallocate the "also propagate" op
 | 
			
		||||
                                      array. Here we save the old one to
 | 
			
		||||
                                      restore it later. */
 | 
			
		||||
};
 | 
			
		||||
typedef struct RedisModuleCtx RedisModuleCtx;
 | 
			
		||||
 | 
			
		||||
#define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, 0, NULL}
 | 
			
		||||
#define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, 0, NULL, {0}}
 | 
			
		||||
#define REDISMODULE_CTX_MULTI_EMITTED (1<<0)
 | 
			
		||||
#define REDISMODULE_CTX_AUTO_MEMORY (1<<1)
 | 
			
		||||
#define REDISMODULE_CTX_KEYS_POS_REQUEST (1<<2)
 | 
			
		||||
| 
						 | 
				
			
			@ -538,6 +542,24 @@ void moduleHandlePropagationAfterCommandCallback(RedisModuleCtx *ctx) {
 | 
			
		|||
    alsoPropagate(server.execCommand,c->db->id,propargv,1,
 | 
			
		||||
        PROPAGATE_AOF|PROPAGATE_REPL);
 | 
			
		||||
    decrRefCount(propargv[0]);
 | 
			
		||||
 | 
			
		||||
    /* If this is not a module command context (but is instead a simple
 | 
			
		||||
     * callback context), we have to handle directly the "also propagate"
 | 
			
		||||
     * array and emit it. In a module command call this will be handled
 | 
			
		||||
     * directly by call(). */
 | 
			
		||||
    if (!(ctx->flags & REDISMODULE_CTX_MODULE_COMMAND_CALL) &&
 | 
			
		||||
        server.also_propagate.numops)
 | 
			
		||||
    {
 | 
			
		||||
        for (int j = 0; j < server.also_propagate.numops; j++) {
 | 
			
		||||
            redisOp *rop = &server.also_propagate.ops[j];
 | 
			
		||||
            int target = rop->target;
 | 
			
		||||
            if (target)
 | 
			
		||||
                propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target);
 | 
			
		||||
        }
 | 
			
		||||
        redisOpArrayFree(&server.also_propagate);
 | 
			
		||||
    }
 | 
			
		||||
    /* Restore the previous oparray in case of nexted use of the API. */
 | 
			
		||||
    server.also_propagate = ctx->saved_oparray;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/* Free the context after the user function was called. */
 | 
			
		||||
| 
						 | 
				
			
			@ -1354,9 +1376,16 @@ void moduleReplicateMultiIfNeeded(RedisModuleCtx *ctx) {
 | 
			
		|||
    /* If we already emitted MULTI return ASAP. */
 | 
			
		||||
    if (ctx->flags & REDISMODULE_CTX_MULTI_EMITTED) return;
 | 
			
		||||
    /* If this is a thread safe context, we do not want to wrap commands
 | 
			
		||||
     * executed into MUTLI/EXEC, they are executed as single commands
 | 
			
		||||
     * executed into MULTI/EXEC, they are executed as single commands
 | 
			
		||||
     * from an external client in essence. */
 | 
			
		||||
    if (ctx->flags & REDISMODULE_CTX_THREAD_SAFE) return;
 | 
			
		||||
    /* If this is a callback context, and not a module command execution
 | 
			
		||||
     * context, we have to setup the op array for the "also propagate" API
 | 
			
		||||
     * so that RM_Replicate() will work. */
 | 
			
		||||
    if (!(ctx->flags & REDISMODULE_CTX_MODULE_COMMAND_CALL)) {
 | 
			
		||||
        ctx->saved_oparray = server.also_propagate;
 | 
			
		||||
        redisOpArrayInit(&server.also_propagate);
 | 
			
		||||
    }
 | 
			
		||||
    execCommandPropagateMulti(ctx->client);
 | 
			
		||||
    ctx->flags |= REDISMODULE_CTX_MULTI_EMITTED;
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1922,6 +1922,8 @@ struct redisCommand *lookupCommandOrOriginal(sds name);
 | 
			
		|||
void call(client *c, int flags);
 | 
			
		||||
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags);
 | 
			
		||||
void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int target);
 | 
			
		||||
void redisOpArrayInit(redisOpArray *oa);
 | 
			
		||||
void redisOpArrayFree(redisOpArray *oa);
 | 
			
		||||
void forceCommandPropagation(client *c, int flags);
 | 
			
		||||
void preventCommandPropagation(client *c);
 | 
			
		||||
void preventCommandAOF(client *c);
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue