Redis在5.0版本中加入了一个新的数据类型: Streams。 从概念上来看,Streams是一个简单的线性结构。它将所有的记录按照顺序排列在链表中。开发人员可以向Streams添加新的记录,也可以以多种方式查询Streams内的记录。但是,从实现和使用方式上来看,Streams是一个非常复杂的数据结构,原因有三: (1)Streams提供了类似于发布-订阅模式的操作方式。当有新元素加入之后,所有的客户端(或者消费者Consumer)都能收到一份新元素。客户端之间是相互独立的; (2) Streams支持非阻塞(Non-Blocking)和阻塞(Blocking)两种工作方式。在非阻塞模式下,无论是否能从Streams中读取一个元素,该读取操作都会立即返回。在阻塞模式下,如果读取操作不能获取新元素的话,该读取操作会被阻塞,直到有新元素加入Streams为止(或者超时); (3) Streams还支持消费者组(Consumer Group)的概念;由消费者组中的消费者共同读取新元素。
正是因为Streams数据结构的复杂性,我们并没有将其与其他数据类型放在一个章节中。而且,Streams的使用方法与Redis发布-订阅系统有相似之处,所以,我们在介绍了发布-订阅系统之后,我们再来单独讲解Streams,以帮助读者清晰的比较两者的不同。我们将它们之间显著的区别总结如下。
我们将在如下的章节中依次介绍Streams的基本概念、用法和与其相关的内容。
在Redis内部,Streams数据结构包含几个部件。首先,向Streams添加的数据全部保存在一个链表中。链表中每一个节点表示的是一个相互独立的数据。这些数据有着唯一的一个ID,被称为Entry ID。它们在所在的Stream的内部是唯一的,单调递增的。因此,客户端或者消费者能够根据Entry ID的范围查询对应的数据。
当向Stream添加数据时,客户端可自行为新数据设置Entry ID,也可以让Redis服务器为其自动生成。Entry ID由两个部分组成: <millisecondsTime>-<sequenceNumber>;第一个部分是一个时间戳,表示新数据加入Stream的时间;第二个部分是序号,当两个或者多个数据在同一时刻生成时,可以使用序号区别它们。因此,从这两个部分的意义可以看出Entry ID是单调递增的。值得注意的是,当客户端指定Entry ID时,可使用自定义的Entry ID,只需确保Entry ID单调递增即可。例如,客户端可指定第一个元素的Entry ID为0-0,第二个元素的Entry ID为1-0。
Redis还为每一个消费者和消费者组保存一份状态信息。这些信息相互独立,一个消费者状态的变化不会改变其他消费者的状态。在这些信息中,Redis会记录当前消费者最后接收和处理的数据的Entry ID,以便于依次、逐个的向该消费者发送新接收的数据。
Redis也会为消费者组保存一份这样的信息,只是更为复杂的是,在一个消费者组中,Redis还会记录哪些消费者正在处理哪些数据。因为,当一位消费者接收到一份数据后,这份数据不会再次发给同组的其他消费者。与此同时,消费者也能向Redis查询自己正在处理的消息(这是为了能够让从错误状态中恢复的消费者完成正在处理的任务)。
图一 Streams的内部结构。
向Streams添加数据是由命令XADD完成的。例如,下面的命令创建了一个新的Stream,名字为littlewaterdrop_stream。该命令该向新的Streams添加了一个新的元素,该元素包含两个Key/Value对(name/Adam和age/18)。XADD命令中在第二个参数位置使用了星号*的意思是让Redis为新的数据自动生成一个Entry ID。当然,开发人员也可以自行给定一个Entry ID。唯一的限制是新的Entry ID必须大于已使用的任何一个Entry ID。因为在Redis内部,一个Stream的Entry ID是单调递增的。所以,在下面的第二个命令中,我们命令给出了Entry ID以添加第一个元素。最后,我们使用命令XLEN查询littlewaterdrop_stream中存放的元素的个数。目前是两个。
> XADD littlewaterdrop_stream * name Adam age 18
"1591478682515-0"
> XADD littlewaterdrop_stream 1591478682600-0 name Amy age 19
"1591478682600-0"
> XLEN littlewaterdrop_stream
(integer) 2
XRANGE命令可以向Streams中查询Entry ID在某一范围内的数据。例如: 下面的命令查询littlewaterdrop_stream中所有的数据。其中减号-和加号+分别表示该Streams中的最小和最大Entry ID。
> XRANGE littlewaterdrop_stream - +
1) 1) "1591478682515-0"
2) 1) "name"
2) "Adam"
3) "age"
4) "18"
2) 1) "1591478682600-0"
2) 1) "name"
2) "Amy"
3) "age"
4) "19"
开发人员也可以根据具体的Entry ID的值来查询数据。例如:下面的命令查询Entry ID在1591478682515和1591478682600之间的数据。参数COUNT 2是可选参数,表示最多给出2个元素。如果不限制查询个数的话,该命令会查询在此范围内的所有数据。
> XRANGE littlewaterdrop_stream 1591478682515 1591478682600 COUNT 2
1) 1) "1591478682515-0"
2) 1) "name"
2) "Adam"
3) "age"
4) "18"
2) 1) "1591478682600-0"
2) 1) "name"
2) "Amy"
3) "age"
4) "19"
与XRANGE命令使用方法类似,XREVRANGE命令查询某一范围内的数据,并将其已逆序的顺序返回。注意,XRANGE接收的范围参数是start, end; 而XREVRANGE接收的范围参数是end, start;
> XREVRANGE littlewaterdrop_stream 1591478682600 1591478682515
1) 1) "1591478682600-0"
2) 1) "name"
2) "Amy"
3) "age"
4) "19"
2) 1) "1591478682515-0"
2) 1) "name"
2) "Adam"
3) "age"
4) "18"
XREAD命令用于从Streams中读取数据。XREAD命令可用于多种场景之下,所以,我们将从最简单的场景开始,逐步深入。例如,如下的XREAD命令从littlewaterdrop_stream中读取Entry ID大于0的所有数据。该命令是非阻塞模式的XREAD命令,所有,该命令会返回当前littlewaterdrop_stream中的满足条件的所有数据。
> XREAD STREAMS littlewaterdrop_stream 0
1) 1) "littlewaterdrop_stream"
2) 1) 1) "1591478682515-0"
2) 1) "name"
2) "Adam"
3) "age"
4) "18"
2) 1) "1591478682600-0"
2) 1) "name"
2) "Amy"
3) "age"
4) "19"
XREAD命令可接收COUNT参数,表明最多可返回的数据个数。例如,下面的XREAD命令从littlewaterdrop_stream中读取Entry ID大于0的2个数据。
> XREAD COUNT 2 STREAMS littlewaterdrop_stream 0
1) 1) "littlewaterdrop_stream"
2) 1) 1) "1591478682515-0"
2) 1) "name"
2) "Adam"
3) "age"
4) "18"
2) 1) "1591478682600-0"
2) 1) "name"
2) "Amy"
3) "age"
4) "19"
除了非阻塞模式,XREAD命令还支持阻塞模式。开启阻塞模式只需要使用参数BLOCK即可。BLOCK参数之后紧随一个参数,表示最长阻塞的时间,单位为毫秒(MilliSecond)。在使用了BLOCK参数之后,XREAD命令会被阻塞直到要么超时、要么有新数据从Streams中读出。如果在执行XREAD时Streams已有可返回的数据的话,该XREAD命令直接返回,就像是执行非阻塞XREAD命令一样。如下的命令从littlewaterdrop_stream读取新数据。如果当前无新数据,该命令会最长等待100毫秒。这条命令使用了一个特殊的Entry ID。美元符号($)表示当前Streams中所使用最大的Entry ID。因此,该命令仅仅查询littlewaterdrop_stream中的新数据。
> XREAD BLOCK 100 STREAMS littlewaterdrop_stream $
XREAD命令还可以同时从多个Streams中读取数据。例如,下面的命令同时从littlewaterdrop_stream_1和littlewaterdrop_stream_2中读取新数据,最多等待100毫秒。
> XREAD BLOCK 100 STREAMS littlewaterdrop_stream_1 littlewaterdrop_stream_2 $ $
使用XREAD命令已能够实现"多播"功能和"订阅-发布"功能。当有一个新的内容添加入Streams之后,每一个客户端都能收到一份新数据的拷贝。然而,在某些情况下,由于新数据的规模很多,我们希望使用一组客户端来协同处理这些新数据,将数据分配到各个客户端上处理。
Redis为了实现这个场景,设计了一个新的概念消费者组(Consumer Group)。当一个消费者组创建之后,它能够接收Streams上的所有数据,并将其数据分发给该组内的消费者(Consumer)。使用消费者组能确保:
XGROUP命令用于创建消费者组,并将其关联于Streams上。在下面的例子中,该命令在littlewaterdrop_stream上创建了一个消费者组littlewaterdrop_group。最后一个参数美元符号$表示littlewaterdrop_stream中使用的最大的Entry ID。开发人员也可以使用具体的Entry ID。其使用方法与XREAD命令类似。
> XGROUP CREATE littlewaterdrop_stream littlewaterdrop_group $
OK
如果Streams不存在的话,开发人员还可以使用参数MKSTREAM,让Redis自动创建一个Stream。例如:如果littlewaterdrop_stream不存在的话,下面的命令会自动创建一个Stream,命名为littlewaterdrop_stream。
> XGROUP CREATE littlewaterdrop_stream littlewaterdrop_group $ MKSTREAM
OK
XREADGROUP命令用于从消费者组中读取数据。XREADGROUP的参数以及其意义与XREAD非常类似。它们都由BLOCK参数开启阻塞式读取模式。它们都使用COUNT来指定最大读取元素的个数。但是,不同的是,XREADGROUP命令接收GROUP参数,指明从哪个消费者组读取数据。GROUP参数之后紧接着的是消费者组的名称和消费者的名称。在消费者组中,消费者由其名称唯一确定。STREAMS参数之后紧接着的是Stream的名称以及Entry ID。下面的例子中使用了一个特殊的Entry ID:大于号(>)。它表示的意思是,此命令读取一条从未被其他消费者处理过的消息。所以,如果有多个消费者从一个消费者组中读取数据的话,它们将会获得不同的数据。
> XREADGROUP GROUP littlewaterdrop_group consumer1 COUNT 1 STREAMS littlewaterdrop_stream >
开发人员也可以指定一个具体的Entry ID,如下所示。
> XREADGROUP GROUP littlewaterdrop_group consumer1 COUNT 1 STREAMS littlewaterdrop_stream 1591478682515
当接收到一条消息之后,该消息会被标识为PENDING状态。当该消息被成功处理之后,消费者需要使用XACK命令通知Redis服务器该命令已处理完毕,将该消息标识为已处理。
> XACK littlewaterdrop_stream littlewaterdrop_group 1591478682515-0
本章介绍了Redis的Streams的基本概念和用法。Streams的消费者组的概念是从kafka系统"借用"而来,但是Redis的实现方式完全不同。在Redis中,Streams数据结构能方便的支持"多播"方式的数据传播,将一条消息分发给多个接收端。在此基础之上,Redis设计了一个伪接收者:消费者组。消费者组接收所有的消息,但是,在一个消费者组中,消息是发送给不同的接收者处理的。在消费者组内的消息分发机制完全由开发人员(客户端)决定。
注册用户登陆后可留言