transaction

第四章 Redis事务处理(Redis Transactions)

1 引言

事务处理(Transactions)是数据库系统的一个基本功能。在关系型数据库系统(Retional Database Systems)中,事务处理功能为开发人员提供了原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation)和持久性(Durability)。但是,NoSQL数据库中的事务处理与关系型数据库中的事务处理是不同的。

在NoSQL数据库的早期,NoSQL数据库是不支持事务处理的。这可能是因为在早期的设计开发过程中,事务处理特性并不是那么重要;也可能是在NoSQL数据库的实际应用中,有许多应用场景尚不需要事务处理特性。但是,逐渐的,一些NoSQL数据库逐步推出了各自不同的事务特性。Redis作为一个NoSQL数据库的代表,是支持事务特性的。我们将在本章介绍Redis的事务特性,以及内部实现的细节。

2 事务处理的基本用法

2.1 Redis事务特性

Redis通过MULTI、EXEC、DISCARD和WATCH四条命令来向开发人员提供事务处理的功能。通过使用这四条命令,开发人员能够将一组命令"包含"在一个事务中一起执行。与此同时,Redis能够保证:

  1. 包含在一个事务中的命令将会依次执行。并且,如果Redis在同一时间接收到了另一个客户端的命令的话,Redis不会将它们混合在一起执行。这个特性被称为事务隔离(Isolation)。
  2. Redis要么执行事务中所有的命令,要么不执行任何一条命令。这个特性被称为事务的原子性(Atomicity)。

Redis为什么不支持回滚(Rollback)呢?Redis开发人员给出了两个理由。这两个理由是否合理是一个仁者见仁、智者见智的问题,需要结合实际应用场景来分析。

  1. Redis命令的语法简单,不易出错。在Redis应用程序的开发过程中,这种错误很容易发现、排除。
  2. Redis是一个高效、小巧的数据库,因此,Redis不需要支持回滚。

2.2 Redis事务处理的基本用法

当客户端需要发起一个事务时,首先向Redis服务器发送MULTI命令。该命令的意思是通知Redis服务器启动一个新的事务。随后,客户端向服务器发送一组"包含"在该事务中的命令。在这些命令中,客户端可以创建一个新的键/值对,修改或者删除数据。在发送这些命令时,Redis服务器只会将这些命令存放在一个队列(Queue)中,并向客户端返回OK。使用队列保存命令的目的是为了保存命令执行的顺序。

当客户端将所有的命令都发送给服务器之后,客户端可以发送EXEC命令,要求Redis服务器依次执行该事务中的所有命令。在发送EXEC命令之前,Redis服务器仅会保存事务中的命令,而不会执行这些命令。当然,客户端也可以随时向Redis服务器发送DISCARD命令,用以清空当前事务的命令队列,即丢弃之前所有发送给服务器的命令。

例如:下面是一个简单的、使用事务特性来创建一个键值对,并在事务中查询该键的值。我们可以看到,在执行完MULTI命令之后,当客户端继续发送事务中的命令时,Redis服务器会返回结果"QUEUED",以表明这些命令已存入命令队列中。在执行EXEC命令后,Redis服务器会返回命令的执行结果。第一个"OK"对应的是创建新键值对的执行结果;第二个"Little Waterdrop"对应的是第二条查询命令的执行结果。

> MULTI
OK
> SET name "Little Waterdrop"
QUEUED
> GET name
QUEUED
> EXEC
1) OK
2) "Little Waterdrop"

WATCH命令为事务处理提供了一个条件监控(Condition Monitoring)的功能。在启动事务之前,开发人员可以指定监视某一个键的值。如果在运行事务之前(运行EXEC命令之前),该键的值被其他客户端修改的话,那么,Redis服务器会取消这次事务的执行。因此,WATCH命令常常被用于监控事务中是否处理的是正确的值。我们将使用下面的例子来解释WATCH命令的用法。

在下面的例子中,我们首先创建了一个新的键值对,counter的值为1。然后,我们模拟了Redis客户端的逻辑。即,我们首先获取当前counter的值,然后通过某些逻辑处理,需要将counter的值增加1,然后,再写入Redis中。如果在运行这段代码的时候,没有另个一客户端同时在修改counter的值的话,下面的代码能够正常的工作。但是,如果同时有两个客户端在执行这段代码的话,虽然两个客户端都将counter自增1,但是,最后写入的值是2,而不是3。这种经常被称为竞态条件(Race Condition)。

> SET counter 1
OK
> GET counter
"1"
// 在客户端中,通过某些计算,需要将counter的值增加1
> MULTI
OK
> SET counter 2
QUEUED
> EXEC
1) OK

为了避免上述因竞态条件而产生的错误,Redis引入了WATCH命令。如下面的代码所示,在获取counter的值之前,如果开发人员使用WATCH命令监视counter键的值的话,如果在执行EXEC命令之前,其他客户端修改了counter值的话,Redis服务器不会执行这条事务,服务器返回nil。在此时,客户端需要重新处理一遍,以期望在下一次的处理过程中,不会受其他客户端的"打扰"。

> SET counter 1
OK
> WATCH counter
OK
> GET counter
"1"
// 在客户端中,通过某些计算,需要将counter的值增加1
> MULTI
OK
> SET counter 2
QUEUED
> EXEC
(nil)

WATCH命令的运用和执行过程比较复杂,我们将其总结在以下四点中。

  1. 如果客户端按照WATCH-SET-MULTI-EXEC逻辑顺序执行,即,创建监控点,随后在创建事务之前修改监控的键的值,然后创建并执行事务的话,该事务将不会被执行,因为在使用WATCH之后,只能在事务内部修改监控的键的值。
  2. 如果客户端按照WATCH-MULTI-SET-EXEC的逻辑顺序执行的话,如果不被其他客户端"打扰",Redis服务器会执行该事务。
  3. 如果有两个客户端A和B,客户端A先执行WATCH,然后,客户端B修改该监控的键的值的话,随后,客户端A创建的事务将不会被执行,因为被客户端B打扰了。
  4. 如果有两个客户端A和B,客户端A先执行WATCH,然后,客户端B运行一个事务,在该事务中修改了客户端A监控的键的值的话,随后,客户端A创建的事务将不会被执行,因为被客户端B打扰了。

在执行事务之前,客户端可以多次执行WATCH命令以监控多个键的值。同时,Redis还支持UNWATCH命令,以清空之前积累的所有监控的键。

3 事务处理的内部实现

3.1 命令处理函数

MULTI、EXEC、DISCARD、WATCH、和UNWATCH命令名称和其处理函数的映射定义了server.c文件中的redisCommandTable变量中。

// Redis 5.0.8 版本
// server.c
struct redisCommand redisCommandTable[] = {
    {"multi",multiCommand,1,"sF",0,NULL,0,0,0,0,0},
    {"exec",execCommand,1,"sM",0,NULL,0,0,0,0,0},
    {"discard",discardCommand,1,"sF",0,NULL,0,0,0,0,0},
    {"watch",watchCommand,-2,"sF",0,NULL,1,-1,1,0,0},
    {"unwatch",unwatchCommand,1,"sF",0,NULL,0,0,0,0,0},
}

3.2 MULTI命令

MULTI命令的处理很简单。在做完简单的检查后(如果已处于事务处理之后的话,就不能再次执行MULTI命令了),Redis服务器将客户端的flags中的一个标识位设置为1。该标识位专门用于标识客户端是否已经进入事务处理状态。

// Redis 5.0.8 版本
// multi.c
void multiCommand(client *c) {
    if (c->flags & CLIENT_MULTI) {
        addReplyError(c,"MULTI calls can not be nested");
        return;
    }
    c->flags |= CLIENT_MULTI;
    addReply(c,shared.ok);
}

3.3 命令队列

在执行每条命令之前,Redis服务器会检查该客户端是否已处于事务状态中,而且当前命令不是EXEC、DISCARD、MULTI、或者WATCH命令的话,将会调用queueMultiCommand()函数将当前命令放入命令队列。这个队列实际上就是c->mstate.commands。

// Redis 5.0.8 版本
// server.c
int processCommand(client *c) {
    ...
    if (c->flags & CLIENT_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
    {
        queueMultiCommand(c);
        addReply(c,shared.queued);
    }
    ...
}

// multi.c
void queueMultiCommand(client *c) {
    multiCmd *mc;
    int j;

    // 将当前命令放入命令队列c->mstate.commands
    c->mstate.commands = zrealloc(c->mstate.commands,
            sizeof(multiCmd)*(c->mstate.count+1));
    mc = c->mstate.commands+c->mstate.count;
    mc->cmd = c->cmd;
    mc->argc = c->argc;
    mc->argv = zmalloc(sizeof(robj*)*c->argc);
    memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);
    for (j = 0; j < c->argc; j++)
        incrRefCount(mc->argv[j]);
    c->mstate.count++;
    c->mstate.cmd_flags |= c->cmd->flags;
}

从上述的代码中,我们可以推测,DISCARD命令的功能就是清空客户端的命令队列(即c->mstate.commands)。正如我们所推测的那样,在DISCARD命令的处理函数中,调用了discardTransaction()函数,用于释放命令队列所占用的内存,并重新初始化这个队列。

// Redis 5.0.8 版本
// multi.c
void discardCommand(client *c) {
    if (!(c->flags & CLIENT_MULTI)) {
        addReplyError(c,"DISCARD without MULTI");
        return;
    }
    discardTransaction(c);
    addReply(c,shared.ok);
}

void discardTransaction(client *c) {
    freeClientMultiState(c);
    initClientMultiState(c);
    c->flags &= ~(CLIENT_MULTI|CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC);
    unwatchAllKeys(c);
}

3.4 WATCH和UNWATCH命令

从WATCH和UNWATCH命令的处理函数来看,WATCH命令是调用了watchForKey()函数,将监视的键的名字放入c->watched_keys链表中。在放入之前,Redis服务器会检查该键是否已经存在与该链表之中。

从下面的代码可以看出,Redis服务器维护着两种watched_keys。第一种是客户端的watched_keys,它表示的是某一个客户端当前监视的所有的键。第二种是数据库全局的watched_keys,它是所有客户端watched_keys的合集。

// Redis 5.0.8 版本
// multi.c
void watchCommand(client *c) {
    int j;

    if (c->flags & CLIENT_MULTI) {
        addReplyError(c,"WATCH inside MULTI is not allowed");
        return;
    }
    for (j = 1; j < c->argc; j++)
        watchForKey(c,c->argv[j]);
    addReply(c,shared.ok);
}

void watchForKey(client *c, robj *key) {
    list *clients = NULL;
    listIter li;
    listNode *ln;
    watchedKey *wk;

    // 检查key是否已经在监视链表中了
    listRewind(c->watched_keys,&li);
    while((ln = listNext(&li))) {
        wk = listNodeValue(ln);
        if (wk->db == c->db && equalStringObjects(key,wk->key))
            return; /* Key already watched */
    }
    
    // 如果当前key不在监视链表中,则将其加入
    clients = dictFetchValue(c->db->watched_keys,key);
    if (!clients) {
        clients = listCreate();
        dictAdd(c->db->watched_keys,key,clients);
        incrRefCount(key);
    }
    listAddNodeTail(clients,c);
    
    // 将当前key加入当前客户端的监视链表中
    wk = zmalloc(sizeof(*wk));
    wk->key = key;
    wk->db = c->db;
    incrRefCount(key);
    listAddNodeTail(c->watched_keys,wk);
}

UNWATCH命令则删除该客户端所监视的所有的键。如下面函数unwatchAllKeys()所示,li是当前客户端已监视的所有键的链表,该函数使用ln遍历该链表,并从客户端的watched_keys链表中逐个删除。

// Redis 5.0.8 版本
// multi.c
void unwatchCommand(client *c) {
    unwatchAllKeys(c);
    c->flags &= (~CLIENT_DIRTY_CAS);
    addReply(c,shared.ok);
}

void unwatchAllKeys(client *c) {
    listIter li;
    listNode *ln;

    if (listLength(c->watched_keys) == 0) return;
    listRewind(c->watched_keys,&li);
    while((ln = listNext(&li))) {
        list *clients;
        watchedKey *wk;

        /* Lookup the watched key -> clients list and remove the client
         * from the list */
        wk = listNodeValue(ln);
        clients = dictFetchValue(wk->db->watched_keys, wk->key);
        serverAssertWithInfo(c,NULL,clients != NULL);
        listDelNode(clients,listSearchKey(clients,c));
        /* Kill the entry at all if this was the only client */
        if (listLength(clients) == 0)
            dictDelete(wk->db->watched_keys, wk->key);
        /* Remove this watched key from the client->watched list */
        listDelNode(c->watched_keys,ln);
        decrRefCount(wk->key);
        zfree(wk);
    }
}

当一个客户端修改了一个键的值时,会调用signalModifiedKey()函数,以查找是否有客户端正在监视该键。查找的方法很简单,在touchWatchedKey()函数中,Redis服务器会从全局watched_keys链表中查找是否存在该键。如果存在的话,就将对应的客户端置为CLIENT_DIRTY_CAS状态。当该客户端执行EXEC命令时,会检查这个状态(见下一小节)。

// Redis 5.0.8 版本
// db.c
void setKey(redisDb *db, robj *key, robj *val) {
    ...
    signalModifiedKey(db,key);
}

void signalModifiedKey(redisDb *db, robj *key) {
    touchWatchedKey(db,key);
}

// multi.c
void touchWatchedKey(redisDb *db, robj *key) {
    list *clients;
    listIter li;
    listNode *ln;

    if (dictSize(db->watched_keys) == 0) return;
    clients = dictFetchValue(db->watched_keys, key);
    if (!clients) return;

    /* Mark all the clients watching this key as CLIENT_DIRTY_CAS */
    /* Check if we are already watching for this key */
    listRewind(clients,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);

        c->flags |= CLIENT_DIRTY_CAS;
    }
}

3.5 EXEC命令

在理解了上述内容之后,理解EXEC命令的处理就容易多了。总的来说,EXEC命令首先会检查此时是否存在已监控的键的值是否已被修改(已设置了CLIENT_DIRTY_CAS标识位)。如果发现这种情况,则直接调用discardTransaction()函数,放弃本次事务的处理。如果当前事务未被其他客户端"打扰"的话,那么,Redis服务器会从当前客户端的命令队列(c->mstate.commands)中依次访问并执行命令。

// Redis 5.0.8 版本
// multi.c
void execCommand(client *c) {
    ...
    if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC)) {
        addReply(c, c->flags & CLIENT_DIRTY_EXEC ? shared.execaborterr :
                                                  shared.nullmultibulk);
        discardTransaction(c);
        goto handle_monitor;
    }
    ...
    for (j = 0; j < c->mstate.count; j++) {
        c->argc = c->mstate.commands[j].argc;
        c->argv = c->mstate.commands[j].argv;
        c->cmd = c->mstate.commands[j].cmd;

        /* Propagate a MULTI request once we encounter the first command which
         * is not readonly nor an administrative one.
         * This way we'll deliver the MULTI/..../EXEC block as a whole and
         * both the AOF and the replication link will have the same consistency
         * and atomicity guarantees. */
        if (!must_propagate && !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN))) {
            execCommandPropagateMulti(c);
            must_propagate = 1;
        }

        call(c,server.loading ? CMD_CALL_NONE : CMD_CALL_FULL);

        /* Commands may alter argc/argv, restore mstate. */
        c->mstate.commands[j].argc = c->argc;
        c->mstate.commands[j].argv = c->argv;
        c->mstate.commands[j].cmd = c->cmd;
    }
    ...
}

3.6 Redis服务器如何提供事务原子性和事务隔离?

在理解了上述命令的执行过程之后,我们再来简要的总结以下Redis数据库是如何实现事务特性的。

事务原子性(Atomicity):因为Redis是使用单线程模式执行所有命令的。当客户端下发EXEC命令时,Redis服务器才正式开始执行当前的事务。在执行EXEC命令的过程中(execCommand函数),Redis会进行一些检查。如果未能通过检查,则Redis不会执行任何命令。如果通过了检查,则Redis会执行所有的命令。因此,Redis不存在仅执行部分命令的情况。从而提供了事务原子特性。

事务隔离(Isolation):因为Redis是使用单线程模式执行所有的命令的,当Redis开始执行当前的事务时,即使存在其他客户端在同一时间向Redis服务器发送命令,修改数据,但是,在当前事务执行完毕之前,Redis服务器是不会处理新接收的命令的。因为,Redis的所有命令是在一个线程中处理的。所以,当Redis服务器在运行一个事务期间,不会执行任何来自于其他客户端的命令,从而提供了事务隔离特性。

一致性(Consistency):因为Redis是使用单线程模式执行所有的命令的,因为,所有的事务都是顺序执行的。只有当一个事务结束之后,才能启动执行下一个事务。从而提供了事务一致性。

持久性(Durability):当开启AOF持久特性时,Redis会记录所有数据的变化。因此,在事务成功完成之后,即使服务器出现故障,Redis数据库也不会丢失事务处理的结果。

4 小结

本章介绍了Redis事务的基本用法以及内部实现原理。Redis需要开启AOF持久化特性才能像关系型数据库那样提供事务持久性(Durability),因为,Redis是将数据存放在内存中的。然而,Redis事务所具备的原子性、隔离特性和一致性已经为Redis应用程序提供了足够的事务支持。

 

上一章
下一章

注册用户登陆后可留言

Copyright  2019 Little Waterdrop, LLC. All Rights Reserved.