mirror of https://github.com/redis/redis.git
				
				
				
			Merge branch 'unstable' of github.com:/antirez/redis into unstable
This commit is contained in:
		
						commit
						959cdb358b
					
				
							
								
								
									
										43
									
								
								src/multi.c
								
								
								
								
							
							
						
						
									
										43
									
								
								src/multi.c
								
								
								
								
							|  | @ -36,6 +36,7 @@ void initClientMultiState(client *c) { | ||||||
|     c->mstate.commands = NULL; |     c->mstate.commands = NULL; | ||||||
|     c->mstate.count = 0; |     c->mstate.count = 0; | ||||||
|     c->mstate.cmd_flags = 0; |     c->mstate.cmd_flags = 0; | ||||||
|  |     c->mstate.cmd_inv_flags = 0; | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| /* Release all the resources associated with MULTI/EXEC state */ | /* Release all the resources associated with MULTI/EXEC state */ | ||||||
|  | @ -76,6 +77,7 @@ void queueMultiCommand(client *c) { | ||||||
|         incrRefCount(mc->argv[j]); |         incrRefCount(mc->argv[j]); | ||||||
|     c->mstate.count++; |     c->mstate.count++; | ||||||
|     c->mstate.cmd_flags |= c->cmd->flags; |     c->mstate.cmd_flags |= c->cmd->flags; | ||||||
|  |     c->mstate.cmd_inv_flags |= ~c->cmd->flags; | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| void discardTransaction(client *c) { | void discardTransaction(client *c) { | ||||||
|  | @ -122,6 +124,23 @@ void execCommandPropagateExec(client *c) { | ||||||
|               PROPAGATE_AOF|PROPAGATE_REPL); |               PROPAGATE_AOF|PROPAGATE_REPL); | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | /* Aborts a transaction, with a specific error message.
 | ||||||
|  |  * The transaction is always aboarted with -EXECABORT so that the client knows | ||||||
|  |  * the server exited the multi state, but the actual reason for the abort is | ||||||
|  |  * included too. */ | ||||||
|  | void execCommandAbort(client *c, sds error) { | ||||||
|  |     discardTransaction(c); | ||||||
|  | 
 | ||||||
|  |     if (error[0] == '-') error++; | ||||||
|  |     addReplyErrorFormat(c, "-EXECABORT Transaction discarded because of: %s", error); | ||||||
|  | 
 | ||||||
|  |     /* Send EXEC to clients waiting data from MONITOR. We did send a MULTI
 | ||||||
|  |      * already, and didn't send any of the queued commands, now we'll just send | ||||||
|  |      * EXEC so it is clear that the transaction is over. */ | ||||||
|  |     if (listLength(server.monitors) && !server.loading) | ||||||
|  |         replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc); | ||||||
|  | } | ||||||
|  | 
 | ||||||
| void execCommand(client *c) { | void execCommand(client *c) { | ||||||
|     int j; |     int j; | ||||||
|     robj **orig_argv; |     robj **orig_argv; | ||||||
|  | @ -135,15 +154,6 @@ void execCommand(client *c) { | ||||||
|         return; |         return; | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     /* If we are in -BUSY state, flag the transaction and return the
 |  | ||||||
|      * -BUSY error, like Redis <= 5. This is a temporary fix, may be changed |  | ||||||
|      *  ASAP, see issue #7353 on Github. */ |  | ||||||
|     if (server.lua_timedout) { |  | ||||||
|         flagTransaction(c); |  | ||||||
|         addReply(c, shared.slowscripterr); |  | ||||||
|         return; |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     /* Check if we need to abort the EXEC because:
 |     /* Check if we need to abort the EXEC because:
 | ||||||
|      * 1) Some WATCHed key was touched. |      * 1) Some WATCHed key was touched. | ||||||
|      * 2) There was a previous error while queueing commands. |      * 2) There was a previous error while queueing commands. | ||||||
|  | @ -157,21 +167,6 @@ void execCommand(client *c) { | ||||||
|         goto handle_monitor; |         goto handle_monitor; | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     /* If there are write commands inside the transaction, and this is a read
 |  | ||||||
|      * only slave, we want to send an error. This happens when the transaction |  | ||||||
|      * was initiated when the instance was a master or a writable replica and |  | ||||||
|      * then the configuration changed (for example instance was turned into |  | ||||||
|      * a replica). */ |  | ||||||
|     if (!server.loading && server.masterhost && server.repl_slave_ro && |  | ||||||
|         !(c->flags & CLIENT_MASTER) && c->mstate.cmd_flags & CMD_WRITE) |  | ||||||
|     { |  | ||||||
|         addReplyError(c, |  | ||||||
|             "Transaction contains write commands but instance " |  | ||||||
|             "is now a read-only replica. EXEC aborted."); |  | ||||||
|         discardTransaction(c); |  | ||||||
|         goto handle_monitor; |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     /* Exec all the queued commands */ |     /* Exec all the queued commands */ | ||||||
|     unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */ |     unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */ | ||||||
|     orig_argv = c->argv; |     orig_argv = c->argv; | ||||||
|  |  | ||||||
|  | @ -407,19 +407,23 @@ void addReplyError(client *c, const char *err) { | ||||||
|     addReplyErrorLength(c,err,strlen(err)); |     addReplyErrorLength(c,err,strlen(err)); | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | /* See addReplyErrorLength.
 | ||||||
|  |  * Makes sure there are no newlines in the string, otherwise invalid protocol | ||||||
|  |  * is emitted. */ | ||||||
|  | void addReplyErrorSafe(client *c, char *s, size_t len) { | ||||||
|  |     size_t j; | ||||||
|  |     for (j = 0; j < len; j++) { | ||||||
|  |         if (s[j] == '\r' || s[j] == '\n') s[j] = ' '; | ||||||
|  |     } | ||||||
|  |     addReplyErrorLength(c,s,sdslen(s)); | ||||||
|  | } | ||||||
|  | 
 | ||||||
| void addReplyErrorFormat(client *c, const char *fmt, ...) { | void addReplyErrorFormat(client *c, const char *fmt, ...) { | ||||||
|     size_t l, j; |  | ||||||
|     va_list ap; |     va_list ap; | ||||||
|     va_start(ap,fmt); |     va_start(ap,fmt); | ||||||
|     sds s = sdscatvprintf(sdsempty(),fmt,ap); |     sds s = sdscatvprintf(sdsempty(),fmt,ap); | ||||||
|     va_end(ap); |     va_end(ap); | ||||||
|     /* Make sure there are no newlines in the string, otherwise invalid protocol
 |     addReplyErrorSafe(c, s, sdslen(s)); | ||||||
|      * is emitted. */ |  | ||||||
|     l = sdslen(s); |  | ||||||
|     for (j = 0; j < l; j++) { |  | ||||||
|         if (s[j] == '\r' || s[j] == '\n') s[j] = ' '; |  | ||||||
|     } |  | ||||||
|     addReplyErrorLength(c,s,sdslen(s)); |  | ||||||
|     sdsfree(s); |     sdsfree(s); | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
							
								
								
									
										93
									
								
								src/server.c
								
								
								
								
							
							
						
						
									
										93
									
								
								src/server.c
								
								
								
								
							|  | @ -3402,6 +3402,34 @@ void call(client *c, int flags) { | ||||||
|     server.stat_numcommands++; |     server.stat_numcommands++; | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | /* Used when a command that is ready for execution needs to be rejected, due to
 | ||||||
|  |  * varios pre-execution checks. it returns the appropriate error to the client. | ||||||
|  |  * If there's a transaction is flags it as dirty, and if the command is EXEC, | ||||||
|  |  * it aborts the transaction. */ | ||||||
|  | void rejectCommand(client *c, robj *reply) { | ||||||
|  |     flagTransaction(c); | ||||||
|  |     if (c->cmd && c->cmd->proc == execCommand) { | ||||||
|  |         execCommandAbort(c, reply->ptr); | ||||||
|  |     } else { | ||||||
|  |         /* using addReplyError* rather than addReply so that the error can be logged. */ | ||||||
|  |         addReplyErrorSafe(c, reply->ptr, sdslen(reply->ptr)); | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | void rejectCommandFormat(client *c, const char *fmt, ...) { | ||||||
|  |     flagTransaction(c); | ||||||
|  |     va_list ap; | ||||||
|  |     va_start(ap,fmt); | ||||||
|  |     sds s = sdscatvprintf(sdsempty(),fmt,ap); | ||||||
|  |     va_end(ap); | ||||||
|  |     if (c->cmd && c->cmd->proc == execCommand) { | ||||||
|  |         execCommandAbort(c, s); | ||||||
|  |     } else { | ||||||
|  |         addReplyErrorSafe(c, s, sdslen(s)); | ||||||
|  |     } | ||||||
|  |     sdsfree(s); | ||||||
|  | } | ||||||
|  | 
 | ||||||
| /* If this function gets called we already read a whole
 | /* If this function gets called we already read a whole
 | ||||||
|  * command, arguments are in the client argv/argc fields. |  * command, arguments are in the client argv/argc fields. | ||||||
|  * processCommand() execute the command or prepare the |  * processCommand() execute the command or prepare the | ||||||
|  | @ -3427,23 +3455,30 @@ int processCommand(client *c) { | ||||||
|      * such as wrong arity, bad command name and so forth. */ |      * such as wrong arity, bad command name and so forth. */ | ||||||
|     c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr); |     c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr); | ||||||
|     if (!c->cmd) { |     if (!c->cmd) { | ||||||
|         flagTransaction(c); |  | ||||||
|         sds args = sdsempty(); |         sds args = sdsempty(); | ||||||
|         int i; |         int i; | ||||||
|         for (i=1; i < c->argc && sdslen(args) < 128; i++) |         for (i=1; i < c->argc && sdslen(args) < 128; i++) | ||||||
|             args = sdscatprintf(args, "`%.*s`, ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr); |             args = sdscatprintf(args, "`%.*s`, ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr); | ||||||
|         addReplyErrorFormat(c,"unknown command `%s`, with args beginning with: %s", |         rejectCommandFormat(c,"unknown command `%s`, with args beginning with: %s", | ||||||
|             (char*)c->argv[0]->ptr, args); |             (char*)c->argv[0]->ptr, args); | ||||||
|         sdsfree(args); |         sdsfree(args); | ||||||
|         return C_OK; |         return C_OK; | ||||||
|     } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) || |     } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) || | ||||||
|                (c->argc < -c->cmd->arity)) { |                (c->argc < -c->cmd->arity)) { | ||||||
|         flagTransaction(c); |         rejectCommandFormat(c,"wrong number of arguments for '%s' command", | ||||||
|         addReplyErrorFormat(c,"wrong number of arguments for '%s' command", |  | ||||||
|             c->cmd->name); |             c->cmd->name); | ||||||
|         return C_OK; |         return C_OK; | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     int is_write_command = (c->cmd->flags & CMD_WRITE) || | ||||||
|  |                            (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE)); | ||||||
|  |     int is_denyoom_command = (c->cmd->flags & CMD_DENYOOM) || | ||||||
|  |                              (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_DENYOOM)); | ||||||
|  |     int is_denystale_command = !(c->cmd->flags & CMD_STALE) || | ||||||
|  |                                (c->cmd->proc == execCommand && (c->mstate.cmd_inv_flags & CMD_STALE)); | ||||||
|  |     int is_denyloading_command = !(c->cmd->flags & CMD_LOADING) || | ||||||
|  |                                  (c->cmd->proc == execCommand && (c->mstate.cmd_inv_flags & CMD_LOADING)); | ||||||
|  | 
 | ||||||
|     /* Check if the user is authenticated. This check is skipped in case
 |     /* Check if the user is authenticated. This check is skipped in case
 | ||||||
|      * the default user is flagged as "nopass" and is active. */ |      * the default user is flagged as "nopass" and is active. */ | ||||||
|     int auth_required = (!(DefaultUser->flags & USER_FLAG_NOPASS) || |     int auth_required = (!(DefaultUser->flags & USER_FLAG_NOPASS) || | ||||||
|  | @ -3453,8 +3488,7 @@ int processCommand(client *c) { | ||||||
|         /* AUTH and HELLO and no auth modules are valid even in
 |         /* AUTH and HELLO and no auth modules are valid even in
 | ||||||
|          * non-authenticated state. */ |          * non-authenticated state. */ | ||||||
|         if (!(c->cmd->flags & CMD_NO_AUTH)) { |         if (!(c->cmd->flags & CMD_NO_AUTH)) { | ||||||
|             flagTransaction(c); |             rejectCommand(c,shared.noautherr); | ||||||
|             addReply(c,shared.noautherr); |  | ||||||
|             return C_OK; |             return C_OK; | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  | @ -3465,13 +3499,12 @@ int processCommand(client *c) { | ||||||
|     int acl_retval = ACLCheckCommandPerm(c,&acl_keypos); |     int acl_retval = ACLCheckCommandPerm(c,&acl_keypos); | ||||||
|     if (acl_retval != ACL_OK) { |     if (acl_retval != ACL_OK) { | ||||||
|         addACLLogEntry(c,acl_retval,acl_keypos,NULL); |         addACLLogEntry(c,acl_retval,acl_keypos,NULL); | ||||||
|         flagTransaction(c); |  | ||||||
|         if (acl_retval == ACL_DENIED_CMD) |         if (acl_retval == ACL_DENIED_CMD) | ||||||
|             addReplyErrorFormat(c, |             rejectCommandFormat(c, | ||||||
|                 "-NOPERM this user has no permissions to run " |                 "-NOPERM this user has no permissions to run " | ||||||
|                 "the '%s' command or its subcommand", c->cmd->name); |                 "the '%s' command or its subcommand", c->cmd->name); | ||||||
|         else |         else | ||||||
|             addReplyErrorFormat(c, |             rejectCommandFormat(c, | ||||||
|                 "-NOPERM this user has no permissions to access " |                 "-NOPERM this user has no permissions to access " | ||||||
|                 "one of the keys used as arguments"); |                 "one of the keys used as arguments"); | ||||||
|         return C_OK; |         return C_OK; | ||||||
|  | @ -3519,13 +3552,11 @@ int processCommand(client *c) { | ||||||
|          * is trying to execute is denied during OOM conditions or the client |          * is trying to execute is denied during OOM conditions or the client | ||||||
|          * is in MULTI/EXEC context? Error. */ |          * is in MULTI/EXEC context? Error. */ | ||||||
|         if (out_of_memory && |         if (out_of_memory && | ||||||
|             (c->cmd->flags & CMD_DENYOOM || |             (is_denyoom_command || | ||||||
|              (c->flags & CLIENT_MULTI && |              (c->flags & CLIENT_MULTI && | ||||||
|               c->cmd->proc != execCommand && |  | ||||||
|               c->cmd->proc != discardCommand))) |               c->cmd->proc != discardCommand))) | ||||||
|         { |         { | ||||||
|             flagTransaction(c); |             rejectCommand(c, shared.oomerr); | ||||||
|             addReply(c, shared.oomerr); |  | ||||||
|             return C_OK; |             return C_OK; | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|  | @ -3546,17 +3577,14 @@ int processCommand(client *c) { | ||||||
|     int deny_write_type = writeCommandsDeniedByDiskError(); |     int deny_write_type = writeCommandsDeniedByDiskError(); | ||||||
|     if (deny_write_type != DISK_ERROR_TYPE_NONE && |     if (deny_write_type != DISK_ERROR_TYPE_NONE && | ||||||
|         server.masterhost == NULL && |         server.masterhost == NULL && | ||||||
|         (c->cmd->flags & CMD_WRITE || |         (is_write_command ||c->cmd->proc == pingCommand)) | ||||||
|          c->cmd->proc == pingCommand)) |  | ||||||
|     { |     { | ||||||
|         flagTransaction(c); |  | ||||||
|         if (deny_write_type == DISK_ERROR_TYPE_RDB) |         if (deny_write_type == DISK_ERROR_TYPE_RDB) | ||||||
|             addReply(c, shared.bgsaveerr); |             rejectCommand(c, shared.bgsaveerr); | ||||||
|         else |         else | ||||||
|             addReplySds(c, |             rejectCommandFormat(c, | ||||||
|                 sdscatprintf(sdsempty(), |  | ||||||
|                 "-MISCONF Errors writing to the AOF file: %s\r\n", |                 "-MISCONF Errors writing to the AOF file: %s\r\n", | ||||||
|                 strerror(server.aof_last_write_errno))); |                 strerror(server.aof_last_write_errno)); | ||||||
|         return C_OK; |         return C_OK; | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  | @ -3565,11 +3593,10 @@ int processCommand(client *c) { | ||||||
|     if (server.masterhost == NULL && |     if (server.masterhost == NULL && | ||||||
|         server.repl_min_slaves_to_write && |         server.repl_min_slaves_to_write && | ||||||
|         server.repl_min_slaves_max_lag && |         server.repl_min_slaves_max_lag && | ||||||
|         c->cmd->flags & CMD_WRITE && |         is_write_command && | ||||||
|         server.repl_good_slaves_count < server.repl_min_slaves_to_write) |         server.repl_good_slaves_count < server.repl_min_slaves_to_write) | ||||||
|     { |     { | ||||||
|         flagTransaction(c); |         rejectCommand(c, shared.noreplicaserr); | ||||||
|         addReply(c, shared.noreplicaserr); |  | ||||||
|         return C_OK; |         return C_OK; | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  | @ -3577,10 +3604,9 @@ int processCommand(client *c) { | ||||||
|      * accept write commands if this is our master. */ |      * accept write commands if this is our master. */ | ||||||
|     if (server.masterhost && server.repl_slave_ro && |     if (server.masterhost && server.repl_slave_ro && | ||||||
|         !(c->flags & CLIENT_MASTER) && |         !(c->flags & CLIENT_MASTER) && | ||||||
|         c->cmd->flags & CMD_WRITE) |         is_write_command) | ||||||
|     { |     { | ||||||
|         flagTransaction(c); |         rejectCommand(c, shared.roslaveerr); | ||||||
|         addReply(c, shared.roslaveerr); |  | ||||||
|         return C_OK; |         return C_OK; | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  | @ -3592,7 +3618,7 @@ int processCommand(client *c) { | ||||||
|         c->cmd->proc != unsubscribeCommand && |         c->cmd->proc != unsubscribeCommand && | ||||||
|         c->cmd->proc != psubscribeCommand && |         c->cmd->proc != psubscribeCommand && | ||||||
|         c->cmd->proc != punsubscribeCommand) { |         c->cmd->proc != punsubscribeCommand) { | ||||||
|         addReplyErrorFormat(c, |         rejectCommandFormat(c, | ||||||
|             "Can't execute '%s': only (P)SUBSCRIBE / " |             "Can't execute '%s': only (P)SUBSCRIBE / " | ||||||
|             "(P)UNSUBSCRIBE / PING / QUIT are allowed in this context", |             "(P)UNSUBSCRIBE / PING / QUIT are allowed in this context", | ||||||
|             c->cmd->name); |             c->cmd->name); | ||||||
|  | @ -3604,17 +3630,16 @@ int processCommand(client *c) { | ||||||
|      * link with master. */ |      * link with master. */ | ||||||
|     if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED && |     if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED && | ||||||
|         server.repl_serve_stale_data == 0 && |         server.repl_serve_stale_data == 0 && | ||||||
|         !(c->cmd->flags & CMD_STALE)) |         is_denystale_command) | ||||||
|     { |     { | ||||||
|         flagTransaction(c); |         rejectCommand(c, shared.masterdownerr); | ||||||
|         addReply(c, shared.masterdownerr); |  | ||||||
|         return C_OK; |         return C_OK; | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     /* Loading DB? Return an error if the command has not the
 |     /* Loading DB? Return an error if the command has not the
 | ||||||
|      * CMD_LOADING flag. */ |      * CMD_LOADING flag. */ | ||||||
|     if (server.loading && !(c->cmd->flags & CMD_LOADING)) { |     if (server.loading && is_denyloading_command) { | ||||||
|         addReply(c, shared.loadingerr); |         rejectCommand(c, shared.loadingerr); | ||||||
|         return C_OK; |         return C_OK; | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  | @ -3629,7 +3654,6 @@ int processCommand(client *c) { | ||||||
|           c->cmd->proc != helloCommand && |           c->cmd->proc != helloCommand && | ||||||
|           c->cmd->proc != replconfCommand && |           c->cmd->proc != replconfCommand && | ||||||
|           c->cmd->proc != multiCommand && |           c->cmd->proc != multiCommand && | ||||||
|           c->cmd->proc != execCommand && |  | ||||||
|           c->cmd->proc != discardCommand && |           c->cmd->proc != discardCommand && | ||||||
|           c->cmd->proc != watchCommand && |           c->cmd->proc != watchCommand && | ||||||
|           c->cmd->proc != unwatchCommand && |           c->cmd->proc != unwatchCommand && | ||||||
|  | @ -3640,8 +3664,7 @@ int processCommand(client *c) { | ||||||
|           c->argc == 2 && |           c->argc == 2 && | ||||||
|           tolower(((char*)c->argv[1]->ptr)[0]) == 'k')) |           tolower(((char*)c->argv[1]->ptr)[0]) == 'k')) | ||||||
|     { |     { | ||||||
|         flagTransaction(c); |         rejectCommand(c, shared.slowscripterr); | ||||||
|         addReply(c, shared.slowscripterr); |  | ||||||
|         return C_OK; |         return C_OK; | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -666,6 +666,9 @@ typedef struct multiState { | ||||||
|     int cmd_flags;          /* The accumulated command flags OR-ed together.
 |     int cmd_flags;          /* The accumulated command flags OR-ed together.
 | ||||||
|                                So if at least a command has a given flag, it |                                So if at least a command has a given flag, it | ||||||
|                                will be set in this field. */ |                                will be set in this field. */ | ||||||
|  |     int cmd_inv_flags;      /* Same as cmd_flags, OR-ing the ~flags. so that it
 | ||||||
|  |                                is possible to know if all the commands have a | ||||||
|  |                                certain flag. */ | ||||||
|     int minreplicas;        /* MINREPLICAS for synchronous replication */ |     int minreplicas;        /* MINREPLICAS for synchronous replication */ | ||||||
|     time_t minreplicas_timeout; /* MINREPLICAS timeout as unixtime. */ |     time_t minreplicas_timeout; /* MINREPLICAS timeout as unixtime. */ | ||||||
| } multiState; | } multiState; | ||||||
|  | @ -1626,6 +1629,7 @@ void addReplyBulkLongLong(client *c, long long ll); | ||||||
| void addReply(client *c, robj *obj); | void addReply(client *c, robj *obj); | ||||||
| void addReplySds(client *c, sds s); | void addReplySds(client *c, sds s); | ||||||
| void addReplyBulkSds(client *c, sds s); | void addReplyBulkSds(client *c, sds s); | ||||||
|  | void addReplyErrorSafe(client *c, char *s, size_t len); | ||||||
| void addReplyError(client *c, const char *err); | void addReplyError(client *c, const char *err); | ||||||
| void addReplyStatus(client *c, const char *status); | void addReplyStatus(client *c, const char *status); | ||||||
| void addReplyDouble(client *c, double d); | void addReplyDouble(client *c, double d); | ||||||
|  | @ -1724,6 +1728,7 @@ void touchWatchedKey(redisDb *db, robj *key); | ||||||
| void touchWatchedKeysOnFlush(int dbid); | void touchWatchedKeysOnFlush(int dbid); | ||||||
| void discardTransaction(client *c); | void discardTransaction(client *c); | ||||||
| void flagTransaction(client *c); | void flagTransaction(client *c); | ||||||
|  | void execCommandAbort(client *c, sds error); | ||||||
| void execCommandPropagateMulti(client *c); | void execCommandPropagateMulti(client *c); | ||||||
| void execCommandPropagateExec(client *c); | void execCommandPropagateExec(client *c); | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -196,6 +196,21 @@ proc redis_deferring_client {args} { | ||||||
|     return $client |     return $client | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | proc redis_client {args} { | ||||||
|  |     set level 0 | ||||||
|  |     if {[llength $args] > 0 && [string is integer [lindex $args 0]]} { | ||||||
|  |         set level [lindex $args 0] | ||||||
|  |         set args [lrange $args 1 end] | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     # create client that defers reading reply | ||||||
|  |     set client [redis [srv $level "host"] [srv $level "port"] 0 $::tls] | ||||||
|  | 
 | ||||||
|  |     # select the right db and read the response (OK) | ||||||
|  |     $client select 9 | ||||||
|  |     return $client | ||||||
|  | } | ||||||
|  | 
 | ||||||
| # Provide easy access to INFO properties. Same semantic as "proc r". | # Provide easy access to INFO properties. Same semantic as "proc r". | ||||||
| proc s {args} { | proc s {args} { | ||||||
|     set level 0 |     set level 0 | ||||||
|  |  | ||||||
|  | @ -325,74 +325,145 @@ start_server {tags {"multi"}} { | ||||||
|         # check that if MULTI arrives during timeout, it is either refused, or |         # check that if MULTI arrives during timeout, it is either refused, or | ||||||
|         # allowed to pass, and we don't end up executing half of the transaction |         # allowed to pass, and we don't end up executing half of the transaction | ||||||
|         set rd1 [redis_deferring_client] |         set rd1 [redis_deferring_client] | ||||||
|         set rd2 [redis_deferring_client] |         set r2 [redis_client] | ||||||
|         r config set lua-time-limit 10 |         r config set lua-time-limit 10 | ||||||
|         r set xx 1 |         r set xx 1 | ||||||
|         $rd1 eval {while true do end} 0 |         $rd1 eval {while true do end} 0 | ||||||
|         after 200 |         after 200 | ||||||
|         catch { $rd2 multi; $rd2 read } e |         catch { $r2 multi; } e | ||||||
|         catch { $rd2 incr xx; $rd2 read } e |         catch { $r2 incr xx; } e | ||||||
|         r script kill |         r script kill | ||||||
|         after 200 ; # Give some time to Lua to call the hook again... |         after 200 ; # Give some time to Lua to call the hook again... | ||||||
|         catch { $rd2 incr xx; $rd2 read } e |         catch { $r2 incr xx; } e | ||||||
|         catch { $rd2 exec; $rd2 read } e |         catch { $r2 exec; } e | ||||||
|  |         assert_match {EXECABORT*previous errors*} $e | ||||||
|         set xx [r get xx] |         set xx [r get xx] | ||||||
|         # make sure that either the whole transcation passed or none of it (we actually expect none) |         # make sure that either the whole transcation passed or none of it (we actually expect none) | ||||||
|         assert { $xx == 1 || $xx == 3} |         assert { $xx == 1 || $xx == 3} | ||||||
|         # check that the connection is no longer in multi state |         # check that the connection is no longer in multi state | ||||||
|         $rd2 ping asdf |         set pong [$r2 ping asdf] | ||||||
|         set pong [$rd2 read] |  | ||||||
|         assert_equal $pong "asdf" |         assert_equal $pong "asdf" | ||||||
|  |         $rd1 close; $r2 close | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     test {EXEC and script timeout} { |     test {EXEC and script timeout} { | ||||||
|         # check that if EXEC arrives during timeout, we don't end up executing |         # check that if EXEC arrives during timeout, we don't end up executing | ||||||
|         # half of the transaction, and also that we exit the multi state |         # half of the transaction, and also that we exit the multi state | ||||||
|         set rd1 [redis_deferring_client] |         set rd1 [redis_deferring_client] | ||||||
|         set rd2 [redis_deferring_client] |         set r2 [redis_client] | ||||||
|         r config set lua-time-limit 10 |         r config set lua-time-limit 10 | ||||||
|         r set xx 1 |         r set xx 1 | ||||||
|         catch { $rd2 multi; $rd2 read } e |         catch { $r2 multi; } e | ||||||
|         catch { $rd2 incr xx; $rd2 read } e |         catch { $r2 incr xx; } e | ||||||
|         $rd1 eval {while true do end} 0 |         $rd1 eval {while true do end} 0 | ||||||
|         after 200 |         after 200 | ||||||
|         catch { $rd2 incr xx; $rd2 read } e |         catch { $r2 incr xx; } e | ||||||
|         catch { $rd2 exec; $rd2 read } e |         catch { $r2 exec; } e | ||||||
|  |         assert_match {EXECABORT*BUSY*} $e | ||||||
|         r script kill |         r script kill | ||||||
|         after 200 ; # Give some time to Lua to call the hook again... |         after 200 ; # Give some time to Lua to call the hook again... | ||||||
|         set xx [r get xx] |         set xx [r get xx] | ||||||
|         # make sure that either the whole transcation passed or none of it (we actually expect none) |         # make sure that either the whole transcation passed or none of it (we actually expect none) | ||||||
|         assert { $xx == 1 || $xx == 3} |         assert { $xx == 1 || $xx == 3} | ||||||
|         # Discard the transaction since EXEC likely got -BUSY error |  | ||||||
|         # so the client is still in MULTI state. |  | ||||||
|         catch { $rd2 discard ;$rd2 read } e |  | ||||||
|         # check that the connection is no longer in multi state |         # check that the connection is no longer in multi state | ||||||
|         $rd2 ping asdf |         set pong [$r2 ping asdf] | ||||||
|         set pong [$rd2 read] |  | ||||||
|         assert_equal $pong "asdf" |         assert_equal $pong "asdf" | ||||||
|  |         $rd1 close; $r2 close | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     test {MULTI-EXEC body and script timeout} { |     test {MULTI-EXEC body and script timeout} { | ||||||
|         # check that we don't run an imcomplete transaction due to some commands |         # check that we don't run an imcomplete transaction due to some commands | ||||||
|         # arriving during busy script |         # arriving during busy script | ||||||
|         set rd1 [redis_deferring_client] |         set rd1 [redis_deferring_client] | ||||||
|         set rd2 [redis_deferring_client] |         set r2 [redis_client] | ||||||
|         r config set lua-time-limit 10 |         r config set lua-time-limit 10 | ||||||
|         r set xx 1 |         r set xx 1 | ||||||
|         catch { $rd2 multi; $rd2 read } e |         catch { $r2 multi; } e | ||||||
|         catch { $rd2 incr xx; $rd2 read } e |         catch { $r2 incr xx; } e | ||||||
|         $rd1 eval {while true do end} 0 |         $rd1 eval {while true do end} 0 | ||||||
|         after 200 |         after 200 | ||||||
|         catch { $rd2 incr xx; $rd2 read } e |         catch { $r2 incr xx; } e | ||||||
|         r script kill |         r script kill | ||||||
|         after 200 ; # Give some time to Lua to call the hook again... |         after 200 ; # Give some time to Lua to call the hook again... | ||||||
|         catch { $rd2 exec; $rd2 read } e |         catch { $r2 exec; } e | ||||||
|  |         assert_match {EXECABORT*previous errors*} $e | ||||||
|         set xx [r get xx] |         set xx [r get xx] | ||||||
|         # make sure that either the whole transcation passed or none of it (we actually expect none) |         # make sure that either the whole transcation passed or none of it (we actually expect none) | ||||||
|         assert { $xx == 1 || $xx == 3} |         assert { $xx == 1 || $xx == 3} | ||||||
|         # check that the connection is no longer in multi state |         # check that the connection is no longer in multi state | ||||||
|         $rd2 ping asdf |         set pong [$r2 ping asdf] | ||||||
|         set pong [$rd2 read] |  | ||||||
|         assert_equal $pong "asdf" |         assert_equal $pong "asdf" | ||||||
|  |         $rd1 close; $r2 close | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     test {just EXEC and script timeout} { | ||||||
|  |         # check that if EXEC arrives during timeout, we don't end up executing | ||||||
|  |         # actual commands during busy script, and also that we exit the multi state | ||||||
|  |         set rd1 [redis_deferring_client] | ||||||
|  |         set r2 [redis_client] | ||||||
|  |         r config set lua-time-limit 10 | ||||||
|  |         r set xx 1 | ||||||
|  |         catch { $r2 multi; } e | ||||||
|  |         catch { $r2 incr xx; } e | ||||||
|  |         $rd1 eval {while true do end} 0 | ||||||
|  |         after 200 | ||||||
|  |         catch { $r2 exec; } e | ||||||
|  |         assert_match {EXECABORT*BUSY*} $e | ||||||
|  |         r script kill | ||||||
|  |         after 200 ; # Give some time to Lua to call the hook again... | ||||||
|  |         set xx [r get xx] | ||||||
|  |         # make we didn't execute the transaction | ||||||
|  |         assert { $xx == 1} | ||||||
|  |         # check that the connection is no longer in multi state | ||||||
|  |         set pong [$r2 ping asdf] | ||||||
|  |         assert_equal $pong "asdf" | ||||||
|  |         $rd1 close; $r2 close | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     test {exec with write commands and state change} { | ||||||
|  |         # check that exec that contains write commands fails if server state changed since they were queued | ||||||
|  |         set r1 [redis_client] | ||||||
|  |         r set xx 1 | ||||||
|  |         r multi | ||||||
|  |         r incr xx | ||||||
|  |         $r1 config set min-replicas-to-write 2 | ||||||
|  |         catch {r exec} e | ||||||
|  |         assert_match {*EXECABORT*NOREPLICAS*} $e | ||||||
|  |         set xx [r get xx] | ||||||
|  |         # make sure that the INCR wasn't executed | ||||||
|  |         assert { $xx == 1} | ||||||
|  |         $r1 config set min-replicas-to-write 0 | ||||||
|  |         $r1 close; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     test {exec with read commands and stale replica state change} { | ||||||
|  |         # check that exec that contains read commands fails if server state changed since they were queued | ||||||
|  |         r config set replica-serve-stale-data no | ||||||
|  |         set r1 [redis_client] | ||||||
|  |         r set xx 1 | ||||||
|  | 
 | ||||||
|  |         # check that GET is disallowed on stale replica, even if the replica becomes stale only after queuing. | ||||||
|  |         r multi | ||||||
|  |         r get xx | ||||||
|  |         $r1 replicaof localhsot 0 | ||||||
|  |         catch {r exec} e | ||||||
|  |         assert_match {*EXECABORT*MASTERDOWN*} $e | ||||||
|  | 
 | ||||||
|  |         # check that PING is allowed | ||||||
|  |         r multi | ||||||
|  |         r ping | ||||||
|  |         $r1 replicaof localhsot 0 | ||||||
|  |         set pong [r exec] | ||||||
|  |         assert {$pong == "PONG"} | ||||||
|  | 
 | ||||||
|  |         # check that when replica is not stale, GET is allowed | ||||||
|  |         # while we're at it, let's check that multi is allowed on stale replica too | ||||||
|  |         r multi | ||||||
|  |         $r1 replicaof no one | ||||||
|  |         r get xx | ||||||
|  |         set xx [r exec] | ||||||
|  |         # make sure that the INCR was executed | ||||||
|  |         assert { $xx == 1 } | ||||||
|  |         $r1 close; | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue