Redis Stream 消息流处理教程

Redis Stream 是 Redis 5.0 引入的数据结构,专为消息流处理而设计。它结合了发布/订阅模式和消息队列的优点,支持消息持久化、消费者组、消息确认等企业级特性。对于不需要 RabbitMQKafka 重量级方案的中小型应用,Redis Stream 是一个轻量且高效的选择。

一、Redis Stream 核心概念

  • Stream:有序的消息日志,每条消息有唯一的 ID(时间戳-序号格式)。
  • Consumer Group:消费者组,组内的消费者共同消费消息,每条消息只会被组内一个消费者处理。
  • Consumer:消费者,属于某个消费者组。
  • ACK:消息确认,消费者处理完消息后发送确认,Redis 才将消息标记为已处理。
  • PEL:Pending Entries List,已发送但未确认的消息列表。

二、基础操作

2.1 添加消息

# XADD 添加消息到 Stream
# * 表示自动生成 ID
XADD orders * user_id 1001 product "iPhone 15" amount 7999 status "created"
XADD orders * user_id 1002 product "MacBook Air" amount 8999 status "created"
XADD orders * user_id 1003 product "AirPods Pro" amount 1899 status "created"

# 使用自定义 ID
XADD orders 1700000000000-0 user_id 1004 product "iPad" amount 3999 status "created"

# 限制 Stream 长度(保留最近 10000 条)
XADD orders MAXLEN ~ 10000 * user_id 1005 product "Watch" amount 2999 status "created"

2.2 读取消息

# 读取全部消息
XRANGE orders - +

# 读取指定范围
XRANGE orders 1700000000000 +

# 读取最近 5 条
XREVRANGE orders + - COUNT 5

# 获取 Stream 信息
XLEN orders
XINFO STREAM orders

2.3 阻塞读取

# 从最新消息开始阻塞读取($ 表示只读新消息)
XREAD BLOCK 5000 COUNT 10 STREAMS orders $

# 从指定 ID 之后读取
XREAD BLOCK 0 COUNT 1 STREAMS orders 1700000000000-0

# 同时监听多个 Stream
XREAD BLOCK 5000 COUNT 10 STREAMS orders notifications $ $

三、消费者组

消费者组是 Redis Stream 最强大的特性,实现了消息的负载均衡和可靠投递。

3.1 创建消费者组

# 从 Stream 开头开始消费
XGROUP CREATE orders order-processors 0

# 只消费新消息
XGROUP CREATE orders order-processors $ MKSTREAM

# 查看消费者组信息
XINFO GROUPS orders

3.2 消费者读取消息

# consumer-1 从消费者组读取消息
# > 表示读取未分配的新消息
XREADGROUP GROUP order-processors consumer-1 COUNT 1 BLOCK 5000 STREAMS orders >

# consumer-2 同样读取
XREADGROUP GROUP order-processors consumer-2 COUNT 1 BLOCK 5000 STREAMS orders >

# 读取自己的 pending 消息(已分配但未确认)
XREADGROUP GROUP order-processors consumer-1 COUNT 10 STREAMS orders 0

3.3 消息确认

# 处理完成后确认消息
XACK orders order-processors 1700000000001-0

# 批量确认
XACK orders order-processors 1700000000001-0 1700000000002-0 1700000000003-0

# 查看 pending 消息
XPENDING orders order-processors - + 10

# 查看特定消费者的 pending 消息
XPENDING orders order-processors - + 10 consumer-1

3.4 消息转移(处理消费者故障)

# 将超过 60 秒未确认的消息转移给其他消费者
XCLAIM orders order-processors consumer-2 60000 1700000000001-0

# 自动转移(Redis 6.2+)
XAUTOCLAIM orders order-processors consumer-2 60000 0-0 COUNT 10

四、实际应用:订单处理系统

4.1 Python 生产者

import redis
import json
import time

r = redis.Redis(host='localhost', port=6379, password='YourPassword123!')

def create_order(user_id, product, amount):
    order_data = {
        'user_id': str(user_id),
        'product': product,
        'amount': str(amount),
        'status': 'created',
        'timestamp': str(int(time.time()))
    }
    msg_id = r.xadd('orders', order_data, maxlen=100000)
    print(f"Order created: {msg_id}")
    return msg_id

# 模拟创建订单
create_order(1001, 'iPhone 15 Pro', 7999)
create_order(1002, 'MacBook Air M3', 8999)
create_order(1003, 'AirPods Pro 2', 1899)

4.2 Python 消费者

import redis
import time

r = redis.Redis(host='localhost', port=6379, password='YourPassword123!')

# 确保消费者组存在
try:
    r.xgroup_create('orders', 'order-processors', id='0', mkstream=True)
except redis.exceptions.ResponseError:
    pass  # 组已存在

consumer_name = 'worker-1'

while True:
    try:
        # 先处理 pending 消息
        pending = r.xreadgroup(
            'order-processors', consumer_name,
            {'orders': '0'}, count=5
        )
        for stream, messages in pending:
            for msg_id, data in messages:
                if data:
                    process_order(msg_id, data)
                    r.xack('orders', 'order-processors', msg_id)

        # 再读取新消息
        results = r.xreadgroup(
            'order-processors', consumer_name,
            {'orders': '>'}, count=5, block=5000
        )
        if results:
            for stream, messages in results:
                for msg_id, data in messages:
                    process_order(msg_id, data)
                    r.xack('orders', 'order-processors', msg_id)

    except Exception as e:
        print(f"Error: {e}")
        time.sleep(1)

def process_order(msg_id, data):
    print(f"Processing order {msg_id}: {data}")
    # 业务逻辑处理
    time.sleep(0.1)

五、Stream 管理与维护

# 裁剪 Stream(保留最近 N 条)
XTRIM orders MAXLEN 10000

# 精确裁剪(~ 表示近似裁剪,性能更好)
XTRIM orders MAXLEN ~ 10000

# 基于 ID 裁剪(删除指定 ID 之前的消息)
XTRIM orders MINID 1700000000000-0

# 删除特定消息
XDEL orders 1700000000001-0

# 删除消费者
XGROUP DELCONSUMER orders order-processors consumer-3

# 删除消费者组
XGROUP DESTROY orders order-processors

六、监控指标

# Stream 详细信息
XINFO STREAM orders FULL

# 消费者组信息
XINFO GROUPS orders

# 消费者详细信息
XINFO CONSUMERS orders order-processors

# 关键监控指标
# 1. Stream 长度 - 消息积压情况
XLEN orders

# 2. Pending 消息数 - 处理速度
XPENDING orders order-processors

# 3. 消费者空闲时间 - 消费者健康状态
XINFO CONSUMERS orders order-processors

七、与其他消息队列对比

  • Redis Pub/Sub:不持久化,离线消费者会丢失消息。Stream 支持持久化和重放。
  • Redis List(LPUSH/BRPOP):简单队列,不支持消费者组和消息确认。
  • RabbitMQ:功能更丰富(路由、死信队列、TTL),但部署和运维更复杂。
  • Kafka:适合超大规模日志流处理,吞吐量更高但架构更重。

Redis Stream 适合的场景:消息量中等(每秒数千到数万条)、已有 Redis 基础设施、需要简单可靠的消息传递。

八、最佳实践

  • 设置 MAXLEN:始终为 Stream 设置最大长度,防止内存无限增长。
  • 及时 ACK:消费者处理完消息立即确认,减少 PEL 积压。
  • 处理 pending 消息:消费者启动时优先处理自己的 pending 消息。
  • 死信处理:对多次转移仍未成功处理的消息,记录到死信队列人工处理。
  • 合理的消费者数量:消费者数不应超过 Stream 的分区数(消息并行度)。

总结

Redis Stream 是一个功能完善的轻量级消息流解决方案。如果你的搬瓦工 VPS 上已经运行 Redis,无需额外引入消息中间件即可实现可靠的消息传递。对于更复杂的消息路由需求可以考虑 RabbitMQ,超大规模日志流则推荐 Kafka。选购搬瓦工 VPS 请查看 全部方案,使用优惠码 NODESEEK2026 享受 6.77% 折扣,通过 bwh81.net 进入官网。

关于本站

搬瓦工VPS中文网(bwgvps.com)是非官方中文信息站,整理搬瓦工的方案、优惠和教程。我们不销售主机,不提供技术服务。

新手必读
搬瓦工优惠码

NODESEEK2026(优惠 6.77%)

购买时填入即可抵扣。