一、添加数据(往名为mystream的Stream中添加了一个条目)
> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0
二、获取一个Stream的条目数量
> XLEN mystream
(integer) 1
三、XRANGE范围查询
# 根据范围查询Stream,两个特殊的ID- 和 +分别表示可能的最小ID和最大ID
> XRANGE mystream - +
1) 1) 1518951480106-0
2) 1) "sensor-id"
2) "1234"
3) "temperature"
4) "19.8"
2) 1) 1518951482479-0
2) 1) "sensor-id"
2) "9999"
3) "temperature"
4) "18.2"
# 查询两毫秒时间内产生的所有条目
> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
2) 1) "sensor-id"
2) "1234"
3) "temperature"
4) "19.8"
# 在最后放一个可选的COUNT选项,只获取前面N个项目
> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
2) 1) "foo"
2) "value_1"
2) 1) 1519073279157-0
2) 1) "foo"
2) "value_2"
> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
2) 1) "foo"
2) "value_3"
2) 1) 1519073281432-0
2) 1) "foo"
2) "value_4"
# XREVRANGE命令与XRANGE相同,但是以相反的顺序返回元素
> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
2) 1) "foo"
2) "value_10"
四、使用XREAD监听新项目
# XREAD的非阻塞形式,获取流 mystream中ID大于0-0的消息
> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
2) 1) 1) 1519073278252-0
2) 1) "foo"
2) "value_1"
2) 1) 1519073279157-0
2) 1) "foo"
2) "value_2"
# 通过指定BLOCK参数XREAD变成一个阻塞命令
> XREAD BLOCK 0 STREAMS mystream $
# 通过传入多个key来同时从不同的Stream中读取数据
XREAD COUNT 2 STREAMS mystream otherstream 0 0
# 通过指定BLOCK参数,将XREAD变成一个阻塞命令(同样支持多个steam)
> XREAD BLOCK 0 STREAMS mystream $
五、消费者组
- XGROUP 用于创建,摧毁或者管理消费者组。
- XREADGROUP 用于通过消费者组从一个Stream中读取。
- XACK 是允许消费者将待处理消息标记为已正确处理的命令。
# 创建一个消费者组
> XGROUP CREATE mystream mygroup $
OK
# 在开始从Stream中读取之前,让我们往里面放一些消息:
> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0
# 尝试使用消费者组读取(特殊的ID > : 消息到目前为止从未传递给其他消费者)
> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
2) 1) 1) 1526569495631-0
2) 1) "message"
2) "apple"
# 历史待处理的消息(使用特殊ID>获取新消息后,会更新消费者组的最后ID,这些为历史待处理消息)
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
2) 1) 1) 1526569495631-0
2) 1) "message"
2) "apple"
# 消息标记为已处理
> XACK mystream mygroup 1526569495631-0
(integer) 1
六、从永久性失败中恢复
# XPENDING简单使用
> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
2) "2"
# XPENDING命令可以传递更多的参数来获取更多信息,完整的命令签名如下
XPENDING <key> <groupname> [<start-id> <end-id> <count> [<conusmer-name>]]
# 待处理消息
> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
2) "Bob"
3) (integer) 74170458
4) (integer) 1
2) 1) 1526569506935-0
2) "Bob"
3) (integer) 74170458
4) (integer) 1
# XCLAIM 改变消息的所有者
XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> ... <ID-N>
# XCLAIM提供了最小空闲时间,只有在上述消息的空闲时间大于指定的空闲时间时,操作才会起作用,下面第二个客户端的认领会失败
Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0
# XCLAIM命令也返回了消息数据本身。但这不是强制性的。可以使用JUSTID选项,以便仅返回成功认领的消息的ID
> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
2) 1) "message"
2) "orange"
七、Streams 的可观察性
# 报告关于Stream本身的信息
> XINFO STREAM mystream
1) length
2) (integer) 13
3) radix-tree-keys
4) (integer) 1
5) radix-tree-nodes
6) (integer) 2
7) groups
8) (integer) 2
9) first-entry
10) 1) 1524494395530-0
2) 1) "a"
2) "1"
3) "b"
4) "2"
11) last-entry
12) 1) 1526569544280-0
2) 1) "message"
2) "banana"
# 有关消费者组的信息
> XINFO GROUPS mystream
1) 1) name
2) "mygroup"
3) consumers
4) (integer) 2
5) pending
6) (integer) 2
2) 1) name
2) "some-other-group"
3) consumers
4) (integer) 1
5) pending
6) (integer) 0
# 检查特定消费者组的状态
> XINFO CONSUMERS mystream mygroup
1) 1) name
2) "Alice"
3) pending
4) (integer) 1
5) idle
6) (integer) 9104628
2) 1) name
2) "Bob"
3) pending
4) (integer) 1
5) idle
6) (integer) 83841983
七、设置Streams的上限
> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
2) 1) "value"
2) "2"
2) 1) 1526655000369-0
2) 1) "value"
2) "3"
# 如果不需要精确的1000个项目。它可以是1000或者1010或者1030,只要保证至少保存1000个项目就行
XADD mystream MAXLEN ~ 1000 * ... entry fields here ...
八、从Stream中删除单个项目
> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
2) 1) "value"
2) "2"
2) 1) 1526655000369-0
2) 1) "value"
2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
2) 1) "value"
2) "3"