Redis Stream 消息流处理教程
Redis Stream 是 Redis 5.0 引入的数据结构,专为消息流处理而设计。它结合了发布/订阅模式和消息队列的优点,支持消息持久化、消费者组、消息确认等企业级特性。对于不需要 RabbitMQ 或 Kafka 重量级方案的中小型应用,Redis Stream 是一个轻量且高效的选择。
一、Redis Stream 核心概念
- Stream:有序的消息日志,每条消息有唯一的 ID(时间戳-序号格式)。
- Consumer Group:消费者组,组内的消费者共同消费消息,每条消息只会被组内一个消费者处理。
- Consumer:消费者,属于某个消费者组。
- ACK:消息确认,消费者处理完消息后发送确认,Redis 才将消息标记为已处理。
- PEL:Pending Entries List,已发送但未确认的消息列表。
二、基础操作
2.1 添加消息
# XADD 添加消息到 Stream
# * 表示自动生成 ID
XADD orders * user_id 1001 product "iPhone 15" amount 7999 status "created"
XADD orders * user_id 1002 product "MacBook Air" amount 8999 status "created"
XADD orders * user_id 1003 product "AirPods Pro" amount 1899 status "created"
# 使用自定义 ID
XADD orders 1700000000000-0 user_id 1004 product "iPad" amount 3999 status "created"
# 限制 Stream 长度(保留最近 10000 条)
XADD orders MAXLEN ~ 10000 * user_id 1005 product "Watch" amount 2999 status "created"
2.2 读取消息
# 读取全部消息
XRANGE orders - +
# 读取指定范围
XRANGE orders 1700000000000 +
# 读取最近 5 条
XREVRANGE orders + - COUNT 5
# 获取 Stream 信息
XLEN orders
XINFO STREAM orders
2.3 阻塞读取
# 从最新消息开始阻塞读取($ 表示只读新消息)
XREAD BLOCK 5000 COUNT 10 STREAMS orders $
# 从指定 ID 之后读取
XREAD BLOCK 0 COUNT 1 STREAMS orders 1700000000000-0
# 同时监听多个 Stream
XREAD BLOCK 5000 COUNT 10 STREAMS orders notifications $ $
三、消费者组
消费者组是 Redis Stream 最强大的特性,实现了消息的负载均衡和可靠投递。
3.1 创建消费者组
# 从 Stream 开头开始消费
XGROUP CREATE orders order-processors 0
# 只消费新消息
XGROUP CREATE orders order-processors $ MKSTREAM
# 查看消费者组信息
XINFO GROUPS orders
3.2 消费者读取消息
# consumer-1 从消费者组读取消息
# > 表示读取未分配的新消息
XREADGROUP GROUP order-processors consumer-1 COUNT 1 BLOCK 5000 STREAMS orders >
# consumer-2 同样读取
XREADGROUP GROUP order-processors consumer-2 COUNT 1 BLOCK 5000 STREAMS orders >
# 读取自己的 pending 消息(已分配但未确认)
XREADGROUP GROUP order-processors consumer-1 COUNT 10 STREAMS orders 0
3.3 消息确认
# 处理完成后确认消息
XACK orders order-processors 1700000000001-0
# 批量确认
XACK orders order-processors 1700000000001-0 1700000000002-0 1700000000003-0
# 查看 pending 消息
XPENDING orders order-processors - + 10
# 查看特定消费者的 pending 消息
XPENDING orders order-processors - + 10 consumer-1
3.4 消息转移(处理消费者故障)
# 将超过 60 秒未确认的消息转移给其他消费者
XCLAIM orders order-processors consumer-2 60000 1700000000001-0
# 自动转移(Redis 6.2+)
XAUTOCLAIM orders order-processors consumer-2 60000 0-0 COUNT 10
四、实际应用:订单处理系统
4.1 Python 生产者
import redis
import json
import time
r = redis.Redis(host='localhost', port=6379, password='YourPassword123!')
def create_order(user_id, product, amount):
order_data = {
'user_id': str(user_id),
'product': product,
'amount': str(amount),
'status': 'created',
'timestamp': str(int(time.time()))
}
msg_id = r.xadd('orders', order_data, maxlen=100000)
print(f"Order created: {msg_id}")
return msg_id
# 模拟创建订单
create_order(1001, 'iPhone 15 Pro', 7999)
create_order(1002, 'MacBook Air M3', 8999)
create_order(1003, 'AirPods Pro 2', 1899)
4.2 Python 消费者
import redis
import time
r = redis.Redis(host='localhost', port=6379, password='YourPassword123!')
# 确保消费者组存在
try:
r.xgroup_create('orders', 'order-processors', id='0', mkstream=True)
except redis.exceptions.ResponseError:
pass # 组已存在
consumer_name = 'worker-1'
while True:
try:
# 先处理 pending 消息
pending = r.xreadgroup(
'order-processors', consumer_name,
{'orders': '0'}, count=5
)
for stream, messages in pending:
for msg_id, data in messages:
if data:
process_order(msg_id, data)
r.xack('orders', 'order-processors', msg_id)
# 再读取新消息
results = r.xreadgroup(
'order-processors', consumer_name,
{'orders': '>'}, count=5, block=5000
)
if results:
for stream, messages in results:
for msg_id, data in messages:
process_order(msg_id, data)
r.xack('orders', 'order-processors', msg_id)
except Exception as e:
print(f"Error: {e}")
time.sleep(1)
def process_order(msg_id, data):
print(f"Processing order {msg_id}: {data}")
# 业务逻辑处理
time.sleep(0.1)
五、Stream 管理与维护
# 裁剪 Stream(保留最近 N 条)
XTRIM orders MAXLEN 10000
# 精确裁剪(~ 表示近似裁剪,性能更好)
XTRIM orders MAXLEN ~ 10000
# 基于 ID 裁剪(删除指定 ID 之前的消息)
XTRIM orders MINID 1700000000000-0
# 删除特定消息
XDEL orders 1700000000001-0
# 删除消费者
XGROUP DELCONSUMER orders order-processors consumer-3
# 删除消费者组
XGROUP DESTROY orders order-processors
六、监控指标
# Stream 详细信息
XINFO STREAM orders FULL
# 消费者组信息
XINFO GROUPS orders
# 消费者详细信息
XINFO CONSUMERS orders order-processors
# 关键监控指标
# 1. Stream 长度 - 消息积压情况
XLEN orders
# 2. Pending 消息数 - 处理速度
XPENDING orders order-processors
# 3. 消费者空闲时间 - 消费者健康状态
XINFO CONSUMERS orders order-processors
七、与其他消息队列对比
- Redis Pub/Sub:不持久化,离线消费者会丢失消息。Stream 支持持久化和重放。
- Redis List(LPUSH/BRPOP):简单队列,不支持消费者组和消息确认。
- RabbitMQ:功能更丰富(路由、死信队列、TTL),但部署和运维更复杂。
- Kafka:适合超大规模日志流处理,吞吐量更高但架构更重。
Redis Stream 适合的场景:消息量中等(每秒数千到数万条)、已有 Redis 基础设施、需要简单可靠的消息传递。
八、最佳实践
- 设置 MAXLEN:始终为 Stream 设置最大长度,防止内存无限增长。
- 及时 ACK:消费者处理完消息立即确认,减少 PEL 积压。
- 处理 pending 消息:消费者启动时优先处理自己的 pending 消息。
- 死信处理:对多次转移仍未成功处理的消息,记录到死信队列人工处理。
- 合理的消费者数量:消费者数不应超过 Stream 的分区数(消息并行度)。
总结
Redis Stream 是一个功能完善的轻量级消息流解决方案。如果你的搬瓦工 VPS 上已经运行 Redis,无需额外引入消息中间件即可实现可靠的消息传递。对于更复杂的消息路由需求可以考虑 RabbitMQ,超大规模日志流则推荐 Kafka。选购搬瓦工 VPS 请查看 全部方案,使用优惠码 NODESEEK2026 享受 6.77% 折扣,通过 bwh81.net 进入官网。