Redis服务器负责与客户端建立网络连接,之前的数据结构部分已经看过了,本章主要从下面三个方面讲解。

1.服务器执行命令的过程

2.serverCron函数的执行

3.服务器的初始化

服务器执行命令的过程

一个客户端请求命令的基本过程大致如下:

1.客户端发送请求命令给服务器,比如set key value;

2.服务器端接受命令并处理,在数据库中进行设置操作,并返回ok;

3.客户端接受服务器返回的ok,并将这个回复打印给用户看。

接下来会根据这个大致的流程来讲一些细节的处理:

1.发送命令请求:

    当用户在客户端键入一个命令后,客户端会将这个命令转为相应的协议格式,并通过套接字,将命令请求的协议格式发送给服务器;

2.读取命令请求:

    服务器接受到套接字发送来的协议请求后,将其保存在客户端状态的输入缓冲区中(之前的看过redisServer和client的数据结构可以知道),如下;

Redis 设计与实现(第十四章) -- 服务器

 

    对输入缓冲区的协议格式进行解析,提前命令请求中的命令参数,以及参数的个数,分别将参数和参数个数保存到客户端状态的argv和argc属性中,如下;

Redis 设计与实现(第十四章) -- 服务器

 

    调用命令执行器,执行客户端指定的命令。

命令执行器的执行

查找命令实现

命令执行器首先查找argv[0]的参数,并在命令表中查找所指定的指令,并将指令保存到客户端状态的cmd属性中。

命令表是一个字典,键为一个命令名字,比如set、get、del等,而字典的值则是一个redisCommand的数据结构,每个RedisCommand记录一个命令的实现信息,数据结构如下:

struct redisCommand {
    char *name;  //命令的名字,比如set、get等
    redisCommandProc *proc;  //执行命令的实现函数
    int arity;  //命令参数的个数,用于校验命令请求的格式是否正确
    char *sflags; /* Flags as string representation, one char per flag. ,记录命令的属性,比如是读还是写等*/
    int flags;    /* The actual flags, obtained from the 'sflags' field.,对sflags的分析得出的二进制,服务器自动生成 */
    /* Use a function to determine keys arguments in a command line.
     * Used for Redis Cluster redirect. */
    redisGetKeysProc *getkeys_proc;  //
    /* What keys should be loaded in background when calling this command? */
    int firstkey; /* The first argument that's a key (0 = no keys) */
    int lastkey;  /* The last argument that's a key */
    int keystep;  /* The step between first and last key */
    long long microseconds, calls;  //服务器执行的总时长,和执行了多少次
};

下图分别用set和get展示了RedisCommand的结构:
Redis 设计与实现(第十四章) -- 服务器

set命令表示,name为set;参数arity未-3,表示接受三个或三个以上的参数;命令标识为'wm',表示这是一个写入命令,并且在执行前要先对服务器占用内存检查,因为这个命令可能占用大量内存。(命令的大小写不限制)

查找到命令表后,将redisClient中的cmd指针指向redisCommand对应的结构。

 执行预备操作

服务器通过上述步骤,已经将执行命令所需函数、参数、参数个数都收集到了,在真正执行前,还需要执行一个进行一些预备操作,才能保证命令被正确执行,这些操作包括:

1.判断cmd的执行是否指向null,如果指向null,则说明命令不存在,这时给客户端返回错误信息;

2.根据artiy属性的值,判断argc中的参数个数是否正确,如果不正确,则返回错误;

3.检测客户端是否已经通过了认证,未通过认证的只能指向auth命令,如果未通过认证执行auth以为的命令会返回给客户端错误信息;

4.如果服务器打开了maxmemory命令,在命令执行前,先检查服务器的内存占用情况,并在有需要时进行回收,如果回收失败,则给客户端返回错误信息;

5.如果服务器上一次执行bgsave命令错误,并且服务器打开了stop-write-on-bgsave-error功能,如果此时执行的命令是写命令,那么则会返回错误;

6.如果当前客户端正在用subscribe命令订阅频道,或者正则用psubscribe命令订阅模式,那么服务器只会执行订阅有关的四个命令(subscribe,psubscribe,unsubscribe,punsubscribe),其他命令会被拒绝;

7.如果服务器正在执行数据载入,那么客户端发送的命令必须带有1标识(比如info,shutdown,publish等等)才能被服务器执行,其他命令会被拒绝;

8.如果服务器正在执行lua脚本而超时进入阻塞状态,那么服务器只会执行客户端发来的shutdown nosave和script kill命令,其他命令都会被拒绝;

9.如果客户端正在执行事务,那么服务器只会执行客户端发来的事务命令,exec,discard,multi,watch四个命令,其他命令都会被放入事务队列中;

10.如果服务器打开了监视器功能,那么服务器会将就要将执行的命令及参数发送给监视器。

上述步骤都完成后,就开始执行命令了(这里这讲了单机模式的,集群模式下还会有更多一些步骤)。

下面是commandProc的源码,上述步骤的判断与集群模式的处理都在里面:

/* If this function gets called we already read a whole
 * command, arguments are in the client argv/argc fields.
 * processCommand() execute the command or prepare the
 * server for a bulk read from the client.
 *
 * If 1 is returned the client is still alive and valid and
 * other operations can be performed by the caller. Otherwise
 * if 0 is returned the client was destroyed (i.e. after QUIT). */
int processCommand(redisClient *c) {
    /* The QUIT command is handled separately. Normal command procs will
     * go through checking for replication and QUIT will cause trouble
     * when FORCE_REPLICATION is enabled and would be implemented in
     * a regular command proc. */
    if (!strcasecmp(c->argv[0]->ptr,"quit")) {
        addReply(c,shared.ok);
        c->flags |= REDIS_CLOSE_AFTER_REPLY;
        return REDIS_ERR;
    }

    /* Now lookup the command and check ASAP about trivial error conditions
     * such as wrong arity, bad command name and so forth. */
    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
    if (!c->cmd) {
        flagTransaction(c);
        addReplyErrorFormat(c,"unknown command '%s'",
            (char*)c->argv[0]->ptr);
        return REDIS_OK;
    } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
               (c->argc < -c->cmd->arity)) {
        flagTransaction(c);
        addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
            c->cmd->name);
        return REDIS_OK;
    }

    /* Check if the user is authenticated */
    if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)
    {
        flagTransaction(c);
        addReply(c,shared.noautherr);
        return REDIS_OK;
    }

    /* If cluster is enabled perform the cluster redirection here.
     * However we don't perform the redirection if:
     * 1) The sender of this command is our master.
     * 2) The command has no key arguments. */
    if (server.cluster_enabled &&
        !(c->flags & REDIS_MASTER) &&
        !(c->flags & REDIS_LUA_CLIENT &&
          server.lua_caller->flags & REDIS_MASTER) &&
        !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0))
    {
        int hashslot;

        if (server.cluster->state != REDIS_CLUSTER_OK) {
            flagTransaction(c);
            clusterRedirectClient(c,NULL,0,REDIS_CLUSTER_REDIR_DOWN_STATE);
            return REDIS_OK;
        } else {
            int error_code;
            clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code);
            if (n == NULL || n != server.cluster->myself) {
                flagTransaction(c);
                clusterRedirectClient(c,n,hashslot,error_code);
                return REDIS_OK;
            }
        }
    }

    /* Handle the maxmemory directive.
     *
     * First we try to free some memory if possible (if there are volatile
     * keys in the dataset). If there are not the only thing we can do
     * is returning an error. */
    if (server.maxmemory) {
        int retval = freeMemoryIfNeeded();
        /* freeMemoryIfNeeded may flush slave output buffers. This may result
         * into a slave, that may be the active client, to be freed. */
        if (server.current_client == NULL) return REDIS_ERR;

        /* It was impossible to free enough memory, and the command the client
         * is trying to execute is denied during OOM conditions? Error. */
        if ((c->cmd->flags & REDIS_CMD_DENYOOM) && retval == REDIS_ERR) {
            flagTransaction(c);
            addReply(c, shared.oomerr);
            return REDIS_OK;
        }
    }

    /* Don't accept write commands if there are problems persisting on disk
     * and if this is a master instance. */
    if (((server.stop_writes_on_bgsave_err &&
          server.saveparamslen > 0 &&
          server.lastbgsave_status == REDIS_ERR) ||
          server.aof_last_write_status == REDIS_ERR) &&
        server.masterhost == NULL &&
        (c->cmd->flags & REDIS_CMD_WRITE ||
         c->cmd->proc == pingCommand))
    {
        flagTransaction(c);
        if (server.aof_last_write_status == REDIS_OK)
            addReply(c, shared.bgsaveerr);
        else
            addReplySds(c,
                sdscatprintf(sdsempty(),
                "-MISCONF Errors writing to the AOF file: %s\r\n",
                strerror(server.aof_last_write_errno)));
        return REDIS_OK;
    }

    /* Don't accept write commands if there are not enough good slaves and
     * user configured the min-slaves-to-write option. */
    if (server.masterhost == NULL &&
        server.repl_min_slaves_to_write &&
        server.repl_min_slaves_max_lag &&
        c->cmd->flags & REDIS_CMD_WRITE &&
        server.repl_good_slaves_count < server.repl_min_slaves_to_write)
    {
        flagTransaction(c);
        addReply(c, shared.noreplicaserr);
        return REDIS_OK;
    }

    /* Don't accept write commands if this is a read only slave. But
     * accept write commands if this is our master. */
    if (server.masterhost && server.repl_slave_ro &&
        !(c->flags & REDIS_MASTER) &&
        c->cmd->flags & REDIS_CMD_WRITE)
    {
        addReply(c, shared.roslaveerr);
        return REDIS_OK;
    }

    /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
    if (c->flags & REDIS_PUBSUB &&
        c->cmd->proc != pingCommand &&
        c->cmd->proc != subscribeCommand &&
        c->cmd->proc != unsubscribeCommand &&
        c->cmd->proc != psubscribeCommand &&
        c->cmd->proc != punsubscribeCommand) {
        addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context");
        return REDIS_OK;
    }

    /* Only allow INFO and SLAVEOF when slave-serve-stale-data is no and
     * we are a slave with a broken link with master. */
    if (server.masterhost && server.repl_state != REDIS_REPL_CONNECTED &&
        server.repl_serve_stale_data == 0 &&
        !(c->cmd->flags & REDIS_CMD_STALE))
    {
        flagTransaction(c);
        addReply(c, shared.masterdownerr);
        return REDIS_OK;
    }

    /* Loading DB? Return an error if the command has not the
     * REDIS_CMD_LOADING flag. */
    if (server.loading && !(c->cmd->flags & REDIS_CMD_LOADING)) {
        addReply(c, shared.loadingerr);
        return REDIS_OK;
    }

    /* Lua script too slow? Only allow a limited number of commands. */
    if (server.lua_timedout &&
          c->cmd->proc != authCommand &&
          c->cmd->proc != replconfCommand &&
        !(c->cmd->proc == shutdownCommand &&
          c->argc == 2 &&
          tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&
        !(c->cmd->proc == scriptCommand &&
          c->argc == 2 &&
          tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))
    {
        flagTransaction(c);
        addReply(c, shared.slowscripterr);
        return REDIS_OK;
    }

    /* Exec the command */
    if (c->flags & REDIS_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
    {
        queueMultiCommand(c);
        addReply(c,shared.queued);
    } else {
        call(c,REDIS_CALL_FULL);
        c->woff = server.master_repl_offset;
        if (listLength(server.ready_keys))
            handleClientsBlockedOnLists();
    }
    return REDIS_OK;
}

processCommand