Redis Stream是Redis数据库中新添加的一种数据类型,它可以理解为消息队列,用于在一个或多个消费者之间传递消息。在本文中,我们将详细讲解Redis Stream类型的使用方法,并提供两条示例说明。
什么是Redis Stream?
Redis Stream是一个添加到Redis 5.0版本中的新数据类型。它被设计用于在一个或多个消费者之间传递消息。可以将它理解为一个消息队列,但是和其他消息队列不同的是,Redis Stream支持广播和消费组的概念。
Redis Stream的常用操作
创建Stream
创建一个Stream需要指定Stream的名称和Stream所包含的字段名称及其对应的内容。具体操作如下:
XADD streamName ID fieldName1 value1 fieldName2 value2 ...
其中streamName是Stream的名称,ID是一条消息的唯一标识,fieldName1和value1是消息的一个字段及其对应的内容,fieldName2和value2是消息的另一个字段及其对应的内容。每条消息的ID需要是唯一的,可以是当前时间戳等,但是必须递增,否则过期的消息可能会被再次消费。
示例:
XADD mystream 1000 name alice age 20
上述命令将创建一个名为mystream的Stream,并添加一条ID为1000的消息,其中包含两个字段name和age。
读取消息
我们可以通过消费者组和读取消息的方式来接收Stream中的消息。
消费者组
Redis Stream支持消费者组的概念,多个消费者可以组成一个消费者组。每条消息只被消费者组中的其中一个消费者消费,并且同一条消息不会被同一消费者组中的多个消费者消费。
创建消费者组的命令如下:
XGROUP CREATE streamName groupName id
其中,streamName和id分别是Stream名和消息ID,groupName是指消费者组的名称。
示例:
XGROUP CREATE mystream mygroup 0
读取消息
读取消息需要使用消费者ID,并且读取的消息会自动标记为已经被消费。下面是读取消息的命令:
XREADGROUP GROUP groupName consumerName COUNT count [BLOCK milliseconds] STREAMS streamName [id]
其中groupName是消费者组名称,consumerName是消费者ID,count指读取消息的数量,BLOCK表示读取的命令是否需要阻塞(如果需要阻塞,会在一定的时间内等待新消息的到来),milliseconds是阻塞时间(单位:毫秒),streamName是Stream名称,id是要读取的消息的ID,如果不指定id,则从最小的ID开始读取。
示例:
XREADGROUP GROUP mygroup consumer1 COUNT 1 BLOCK 1000 STREAMS mystream >
上述命令表示组mygroup中的consumer1消费者将从mystream的下一个可用 ID 开始消费一条消息,并且如果没有可用消息,则该命令会在1秒后超时。
Stream其他操作
除了创建Stream和读取消息之外,Redis Stream还支持以下几种操作:
-
XLEN:获取Stream中的消息数量。
XLEN mystream
-
XRANGE:获取指定ID范围中的消息。
XRANGE mystream 1000-2000
-
XTRIM:移除指定范围外的消息。
XTRIM mystream MAXLEN 1000
示例说明
示例1:使用Redis Stream实现一个简单的聊天室
以下是一个使用Redis Stream实现的简单聊天室的示例。每个用户都可以通过Redis客户端将消息发送到指定的Stream,在接收到消息后通过WebSocket实时将消息推送给所有在线用户。
服务器端代码
import asyncio
import aioredis
import websockets
CHANNEL = "chat"
async def consumer_handler():
redis = await aioredis.create_redis("redis://localhost:6379")
async with redis.client() as conn:
while True:
# 从stream中读取一条消息,默认阻塞等待
messages = await conn.xread_group(
"chat-group", "chat-consumer", {CHANNEL: ">"}, count=1, block=0
)
# 遍历消息,广播给所有websocket客户端
for msg in messages:
errmsg = "Message {} from stream {} with fields:"\
.format(msg[1], msg[0].decode())
fields, value = msg[2][0], msg[2][1]
for k, v in zip(fields.decode().split(), value.decode().split()):
errmsg += " {}={}".format(k.decode(), v.decode())
print(errmsg)
await ws.send(value)
async def server(websocket, path):
while True:
# 从客户端获取消息
message = await websocket.recv()
# 将消息加入Stream
redis = await aioredis.create_redis("redis://localhost:6379")
msg_id = await redis.xadd("chat", {"text": message})
await redis.close()
# 将消息发布到消费者组中
await websocket.wait_closed()
await redis.execute("XGROUP", "CREATE", CHANNEL, "chat-group", 0, mkstream=True)
await redis.execute("XGROUP", "SETID", CHANNEL, "chat-group", msg_id)
await redis.execute("XADD", CHANNEL, {"message": message}, id=msg_id)
await redis.execute("XGROUP", "DELCONSUMER", CHANNEL, "chat-group", "chat-consumer")
await redis.execute("XGROUP", "CREATE", CHANNEL, "chat-group", "$", True)
await asyncio.sleep(0.1)
ws_start_server = websockets.serve(server, "localhost", 8765)
async def main():
await asyncio.gather(ws_start_server, consumer_handler())
if __name__ == "__main__":
asyncio.run(main())
客户端代码
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Chat</title>
</head>
<body>
<input type="text" id="input" />
<button id="send">Send</button>
<ul id="messages"></ul>
<script>
var ws = new WebSocket("ws://localhost:8765/");
ws.onmessage = function (event) {
var messages = document.getElementById("messages");
var message = document.createElement("li");
message.innerText = event.data;
messages.appendChild(message);
}
send.onclick = function () {
var input = document.getElementById("input");
var message = input.value;
ws.send(message);
input.value = "";
};
</script>
</body>
</html>
示例2:使用Redis Stream实现排队系统
以下是一个使用Redis Stream实现排队系统的示例。每个用户都可以通过Redis客户端将自己的信息加入到指定的Stream,而管理员则从这个Stream中读取最早的N个待服务的用户信息并通知他们前往服务台。
服务器端代码
import asyncio
import aioredis
MAX_QUEUE_SIZE = 5
async def queue_handler():
redis = await aioredis.create_redis("redis://localhost:6379")
async with redis.client() as conn:
while True:
# 从queue中读取存在的元素数量
queue_size = await conn.xlen("queue")
# 如果队列中没有消息,则继续等待
if queue_size == 0:
await asyncio.sleep(0.1)
continue
# 如果队列中的消息数量小于等于指定的阈值,则直接输出队列中的所有消息
elif queue_size <= MAX_QUEUE_SIZE:
messages = await conn.xread({"queue": ">"})
for msg in messages:
fields, values = msg[2][0], msg[2][1]
msg_txt = "".join(["{}:{}\n".format(k.decode(), v.decode()) for k, v in zip(fields.split(), values.split())])
print(msg_txt)
# 否则,读取前MAX_QUEUE_SIZE个消息,并将这些消息从队列中移除
else:
messages = await conn.xrange("queue", count=MAX_QUEUE_SIZE)
# 将所有消息的MinID保存到一个变量中,方便后续使用
min_id = messages[0][0].decode()
# 将前MAX_QUEUE_SIZE个消息从队列中移除
await conn.execute("XTRIM", "queue", MAXLEN=queue_size - MAX_QUEUE_SIZE)
for msg in messages:
fields, values = msg[2][0], msg[2][1]
msg_txt = "".join(["{}:{}\n".format(k.decode(), v.decode()) for k, v in zip(fields.split(), values.split())])
print(msg_txt)
async def server():
redis = await aioredis.create_redis("redis://localhost:6379")
while True:
# 从客户端获取消息
message = input("What's your full name and phone number?: ")
# 将消息加入Stream
msg_id = await redis.xadd("queue", {"message": message})
# 将消息发布到消费者组中
await redis.execute("XGROUP", "CREATE", "queue", "client-group", 0, mkstream=True)
await redis.execute("XGROUP", "SETID", "queue", "client-group", msg_id)
await redis.execute("XADD", "queue", {"message": message}, id=msg_id)
await redis.execute("XGROUP", "DELCONSUMER", "queue", "client-group", "client-consumer")
await redis.execute("XGROUP", "CREATE", "queue", "client-group", "$", True)
await asyncio.sleep(0.1)
async def main():
await asyncio.gather(queue_handler(), server())
if __name__ == "__main__":
asyncio.run(main())
客户端代码
用户向排队系统添加自己的信息,只需要通过命令行运行以下命令即可。
$ redis-cli XADD queue fullName "Alice Smith" phone "555-1234-5678"
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Redis Stream类型的使用详解 - Python技术站