RabbitMQ 消息队列搭建教程
RabbitMQ 是基于 AMQP 协议的开源消息中间件,以其灵活的路由机制、可靠的消息投递和丰富的管理界面著称。在微服务架构中,RabbitMQ 常用于服务间的异步通信、任务队列和事件驱动架构。本文将在搬瓦工 VPS 上完整演示 RabbitMQ 的安装、配置和核心使用方法。如果你只需要轻量级的消息队列,也可以考虑 Redis Stream。
一、安装 RabbitMQ
1.1 使用包管理器安装
# 安装 Erlang(RabbitMQ 的运行时依赖)
apt update
apt install erlang-base erlang-asn1 erlang-crypto erlang-eldap erlang-ftp erlang-inets erlang-mnesia erlang-os-mon erlang-parsetools erlang-public-key erlang-runtime-tools erlang-snmp erlang-ssl erlang-syntax-tools erlang-tftp erlang-tools erlang-xmerl -y
# 添加 RabbitMQ 仓库
curl -1sLf 'https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey' | gpg --dearmor -o /usr/share/keyrings/rabbitmq.gpg
echo "deb [signed-by=/usr/share/keyrings/rabbitmq.gpg] https://packagecloud.io/rabbitmq/rabbitmq-server/ubuntu/ $(lsb_release -cs) main" | tee /etc/apt/sources.list.d/rabbitmq.list
# 安装 RabbitMQ
apt update
apt install rabbitmq-server -y
# 启动服务
systemctl start rabbitmq-server
systemctl enable rabbitmq-server
1.2 使用 Docker 安装(推荐)
docker run -d --name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=RabbitPass123! \
-v rabbitmq_data:/var/lib/rabbitmq \
--restart unless-stopped \
rabbitmq:3-management
二、基础配置
2.1 启用管理插件
rabbitmq-plugins enable rabbitmq_management
管理界面地址:http://your-server-ip:15672,默认账号 guest 只能本地访问。
2.2 创建管理员账号
# 创建用户
rabbitmqctl add_user admin RabbitPass123!
# 设置管理员角色
rabbitmqctl set_user_tags admin administrator
# 设置权限(对所有虚拟主机拥有完全权限)
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
# 删除默认 guest 账号
rabbitmqctl delete_user guest
# 查看用户列表
rabbitmqctl list_users
2.3 创建虚拟主机(隔离不同应用)
# 创建虚拟主机
rabbitmqctl add_vhost myapp
# 为用户设置虚拟主机权限
rabbitmqctl set_permissions -p myapp admin ".*" ".*" ".*"
# 创建应用专用账号
rabbitmqctl add_user appuser AppPass123!
rabbitmqctl set_permissions -p myapp appuser ".*" ".*" ".*"
三、消息路由模式
3.1 Direct 交换机(精确路由)
消息根据 routing key 精确匹配路由到绑定的队列。
# 使用 rabbitmqadmin 命令行工具
rabbitmqadmin declare exchange name=order.direct type=direct durable=true
rabbitmqadmin declare queue name=order.create durable=true
rabbitmqadmin declare queue name=order.cancel durable=true
rabbitmqadmin declare binding source=order.direct destination=order.create routing_key=create
rabbitmqadmin declare binding source=order.direct destination=order.cancel routing_key=cancel
# 发送消息
rabbitmqadmin publish exchange=order.direct routing_key=create payload='{"order_id":1001,"amount":7999}'
3.2 Topic 交换机(模式匹配路由)
rabbitmqadmin declare exchange name=logs.topic type=topic durable=true
rabbitmqadmin declare queue name=all.logs durable=true
rabbitmqadmin declare queue name=error.logs durable=true
# * 匹配一个单词,# 匹配零个或多个单词
rabbitmqadmin declare binding source=logs.topic destination=all.logs routing_key="log.#"
rabbitmqadmin declare binding source=logs.topic destination=error.logs routing_key="log.*.error"
# 发送消息
rabbitmqadmin publish exchange=logs.topic routing_key=log.order.error payload='Order processing failed'
rabbitmqadmin publish exchange=logs.topic routing_key=log.user.info payload='User logged in'
3.3 Fanout 交换机(广播)
rabbitmqadmin declare exchange name=notifications type=fanout durable=true
rabbitmqadmin declare queue name=email.notifications durable=true
rabbitmqadmin declare queue name=sms.notifications durable=true
rabbitmqadmin declare binding source=notifications destination=email.notifications
rabbitmqadmin declare binding source=notifications destination=sms.notifications
四、Python 客户端示例
4.1 生产者
import pika
import json
credentials = pika.PlainCredentials('appuser', 'AppPass123!')
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost', 5672, 'myapp', credentials)
)
channel = connection.channel()
# 声明持久化队列
channel.queue_declare(queue='tasks', durable=True)
# 发送持久化消息
message = json.dumps({'task': 'send_email', 'to': 'user@example.com'})
channel.basic_publish(
exchange='',
routing_key='tasks',
body=message,
properties=pika.BasicProperties(delivery_mode=2) # 持久化
)
print(f"Sent: {message}")
connection.close()
4.2 消费者
import pika
import json
import time
credentials = pika.PlainCredentials('appuser', 'AppPass123!')
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost', 5672, 'myapp', credentials)
)
channel = connection.channel()
channel.queue_declare(queue='tasks', durable=True)
# 公平分发(每次只给消费者一条未确认的消息)
channel.basic_qos(prefetch_count=1)
def callback(ch, method, properties, body):
task = json.loads(body)
print(f"Processing: {task}")
time.sleep(1) # 模拟处理
ch.basic_ack(delivery_tag=method.delivery_tag)
print("Done")
channel.basic_consume(queue='tasks', on_message_callback=callback)
print("Waiting for messages...")
channel.start_consuming()
五、高级特性
5.1 死信队列
# 创建死信交换机和队列
rabbitmqadmin declare exchange name=dlx type=direct
rabbitmqadmin declare queue name=dead_letters durable=true
rabbitmqadmin declare binding source=dlx destination=dead_letters routing_key=dead
# 创建带死信配置的业务队列
rabbitmqadmin declare queue name=work_queue durable=true arguments='{"x-dead-letter-exchange":"dlx","x-dead-letter-routing-key":"dead","x-message-ttl":60000}'
5.2 延迟消息
# 安装延迟消息插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 创建延迟交换机
rabbitmqadmin declare exchange name=delayed type=x-delayed-message arguments='{"x-delayed-type":"direct"}'
5.3 消息优先级
rabbitmqadmin declare queue name=priority_queue durable=true arguments='{"x-max-priority":10}'
六、集群配置
# 在节点 2 上加入集群
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app
# 查看集群状态
rabbitmqctl cluster_status
# 设置镜像策略(队列在所有节点上有副本)
rabbitmqctl set_policy ha-all ".*" '{"ha-mode":"all","ha-sync-mode":"automatic"}' -p myapp
七、监控与调优
# 查看队列状态
rabbitmqctl list_queues name messages consumers
# 查看连接数
rabbitmqctl list_connections
# 查看通道数
rabbitmqctl list_channels
# 内存配置(/etc/rabbitmq/rabbitmq.conf)
vm_memory_high_watermark.relative = 0.6
disk_free_limit.relative = 1.5
# 查看节点状态
rabbitmqctl status
八、常见问题
- 消息积压:增加消费者数量或优化处理逻辑。检查 prefetch_count 设置。
- 内存告警:RabbitMQ 默认在内存超过 40% 时触发流控。调整
vm_memory_high_watermark或增加内存。 - 连接数过多:使用连接池复用连接,避免频繁创建销毁。
- 消息丢失:确保交换机、队列和消息都设置为持久化(durable),并开启消息确认(publisher confirm)。
总结
RabbitMQ 是功能最丰富的开源消息中间件之一,灵活的路由机制和可靠的消息投递使其适用于各种异步通信场景。在搬瓦工 VPS 上推荐使用 Docker 方式部署,简单高效。如果你需要更高吞吐量的日志流处理,可以参考 Kafka 流处理平台。选购搬瓦工 VPS 请查看 全部方案,使用优惠码 NODESEEK2026 享受 6.77% 折扣,通过 bwh81.net 进入官网。