RabbitMQ 消息队列搭建教程

RabbitMQ 是基于 AMQP 协议的开源消息中间件,以其灵活的路由机制、可靠的消息投递和丰富的管理界面著称。在微服务架构中,RabbitMQ 常用于服务间的异步通信、任务队列和事件驱动架构。本文将在搬瓦工 VPS 上完整演示 RabbitMQ 的安装、配置和核心使用方法。如果你只需要轻量级的消息队列,也可以考虑 Redis Stream

一、安装 RabbitMQ

1.1 使用包管理器安装

# 安装 Erlang(RabbitMQ 的运行时依赖)
apt update
apt install erlang-base erlang-asn1 erlang-crypto erlang-eldap erlang-ftp erlang-inets erlang-mnesia erlang-os-mon erlang-parsetools erlang-public-key erlang-runtime-tools erlang-snmp erlang-ssl erlang-syntax-tools erlang-tftp erlang-tools erlang-xmerl -y

# 添加 RabbitMQ 仓库
curl -1sLf 'https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey' | gpg --dearmor -o /usr/share/keyrings/rabbitmq.gpg
echo "deb [signed-by=/usr/share/keyrings/rabbitmq.gpg] https://packagecloud.io/rabbitmq/rabbitmq-server/ubuntu/ $(lsb_release -cs) main" | tee /etc/apt/sources.list.d/rabbitmq.list

# 安装 RabbitMQ
apt update
apt install rabbitmq-server -y

# 启动服务
systemctl start rabbitmq-server
systemctl enable rabbitmq-server

1.2 使用 Docker 安装(推荐)

docker run -d --name rabbitmq \
  -p 5672:5672 \
  -p 15672:15672 \
  -e RABBITMQ_DEFAULT_USER=admin \
  -e RABBITMQ_DEFAULT_PASS=RabbitPass123! \
  -v rabbitmq_data:/var/lib/rabbitmq \
  --restart unless-stopped \
  rabbitmq:3-management

二、基础配置

2.1 启用管理插件

rabbitmq-plugins enable rabbitmq_management

管理界面地址:http://your-server-ip:15672,默认账号 guest 只能本地访问。

2.2 创建管理员账号

# 创建用户
rabbitmqctl add_user admin RabbitPass123!

# 设置管理员角色
rabbitmqctl set_user_tags admin administrator

# 设置权限(对所有虚拟主机拥有完全权限)
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

# 删除默认 guest 账号
rabbitmqctl delete_user guest

# 查看用户列表
rabbitmqctl list_users

2.3 创建虚拟主机(隔离不同应用)

# 创建虚拟主机
rabbitmqctl add_vhost myapp

# 为用户设置虚拟主机权限
rabbitmqctl set_permissions -p myapp admin ".*" ".*" ".*"

# 创建应用专用账号
rabbitmqctl add_user appuser AppPass123!
rabbitmqctl set_permissions -p myapp appuser ".*" ".*" ".*"

三、消息路由模式

3.1 Direct 交换机(精确路由)

消息根据 routing key 精确匹配路由到绑定的队列。

# 使用 rabbitmqadmin 命令行工具
rabbitmqadmin declare exchange name=order.direct type=direct durable=true
rabbitmqadmin declare queue name=order.create durable=true
rabbitmqadmin declare queue name=order.cancel durable=true
rabbitmqadmin declare binding source=order.direct destination=order.create routing_key=create
rabbitmqadmin declare binding source=order.direct destination=order.cancel routing_key=cancel

# 发送消息
rabbitmqadmin publish exchange=order.direct routing_key=create payload='{"order_id":1001,"amount":7999}'

3.2 Topic 交换机(模式匹配路由)

rabbitmqadmin declare exchange name=logs.topic type=topic durable=true
rabbitmqadmin declare queue name=all.logs durable=true
rabbitmqadmin declare queue name=error.logs durable=true

# * 匹配一个单词,# 匹配零个或多个单词
rabbitmqadmin declare binding source=logs.topic destination=all.logs routing_key="log.#"
rabbitmqadmin declare binding source=logs.topic destination=error.logs routing_key="log.*.error"

# 发送消息
rabbitmqadmin publish exchange=logs.topic routing_key=log.order.error payload='Order processing failed'
rabbitmqadmin publish exchange=logs.topic routing_key=log.user.info payload='User logged in'

3.3 Fanout 交换机(广播)

rabbitmqadmin declare exchange name=notifications type=fanout durable=true
rabbitmqadmin declare queue name=email.notifications durable=true
rabbitmqadmin declare queue name=sms.notifications durable=true
rabbitmqadmin declare binding source=notifications destination=email.notifications
rabbitmqadmin declare binding source=notifications destination=sms.notifications

四、Python 客户端示例

4.1 生产者

import pika
import json

credentials = pika.PlainCredentials('appuser', 'AppPass123!')
connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost', 5672, 'myapp', credentials)
)
channel = connection.channel()

# 声明持久化队列
channel.queue_declare(queue='tasks', durable=True)

# 发送持久化消息
message = json.dumps({'task': 'send_email', 'to': 'user@example.com'})
channel.basic_publish(
    exchange='',
    routing_key='tasks',
    body=message,
    properties=pika.BasicProperties(delivery_mode=2)  # 持久化
)
print(f"Sent: {message}")
connection.close()

4.2 消费者

import pika
import json
import time

credentials = pika.PlainCredentials('appuser', 'AppPass123!')
connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost', 5672, 'myapp', credentials)
)
channel = connection.channel()
channel.queue_declare(queue='tasks', durable=True)

# 公平分发(每次只给消费者一条未确认的消息)
channel.basic_qos(prefetch_count=1)

def callback(ch, method, properties, body):
    task = json.loads(body)
    print(f"Processing: {task}")
    time.sleep(1)  # 模拟处理
    ch.basic_ack(delivery_tag=method.delivery_tag)
    print("Done")

channel.basic_consume(queue='tasks', on_message_callback=callback)
print("Waiting for messages...")
channel.start_consuming()

五、高级特性

5.1 死信队列

# 创建死信交换机和队列
rabbitmqadmin declare exchange name=dlx type=direct
rabbitmqadmin declare queue name=dead_letters durable=true
rabbitmqadmin declare binding source=dlx destination=dead_letters routing_key=dead

# 创建带死信配置的业务队列
rabbitmqadmin declare queue name=work_queue durable=true arguments='{"x-dead-letter-exchange":"dlx","x-dead-letter-routing-key":"dead","x-message-ttl":60000}'

5.2 延迟消息

# 安装延迟消息插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

# 创建延迟交换机
rabbitmqadmin declare exchange name=delayed type=x-delayed-message arguments='{"x-delayed-type":"direct"}'

5.3 消息优先级

rabbitmqadmin declare queue name=priority_queue durable=true arguments='{"x-max-priority":10}'

六、集群配置

# 在节点 2 上加入集群
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app

# 查看集群状态
rabbitmqctl cluster_status

# 设置镜像策略(队列在所有节点上有副本)
rabbitmqctl set_policy ha-all ".*" '{"ha-mode":"all","ha-sync-mode":"automatic"}' -p myapp

七、监控与调优

# 查看队列状态
rabbitmqctl list_queues name messages consumers

# 查看连接数
rabbitmqctl list_connections

# 查看通道数
rabbitmqctl list_channels

# 内存配置(/etc/rabbitmq/rabbitmq.conf)
vm_memory_high_watermark.relative = 0.6
disk_free_limit.relative = 1.5

# 查看节点状态
rabbitmqctl status

八、常见问题

  • 消息积压:增加消费者数量或优化处理逻辑。检查 prefetch_count 设置。
  • 内存告警:RabbitMQ 默认在内存超过 40% 时触发流控。调整 vm_memory_high_watermark 或增加内存。
  • 连接数过多:使用连接池复用连接,避免频繁创建销毁。
  • 消息丢失:确保交换机、队列和消息都设置为持久化(durable),并开启消息确认(publisher confirm)。

总结

RabbitMQ 是功能最丰富的开源消息中间件之一,灵活的路由机制和可靠的消息投递使其适用于各种异步通信场景。在搬瓦工 VPS 上推荐使用 Docker 方式部署,简单高效。如果你需要更高吞吐量的日志流处理,可以参考 Kafka 流处理平台。选购搬瓦工 VPS 请查看 全部方案,使用优惠码 NODESEEK2026 享受 6.77% 折扣,通过 bwh81.net 进入官网。

关于本站

搬瓦工VPS中文网(bwgvps.com)是非官方中文信息站,整理搬瓦工的方案、优惠和教程。我们不销售主机,不提供技术服务。

新手必读
搬瓦工优惠码

NODESEEK2026(优惠 6.77%)

购买时填入即可抵扣。