Kafka 流处理平台搭建教程
Apache Kafka 是一个分布式流处理平台,以其超高的吞吐量和水平扩展能力著称,广泛用于日志收集、事件驱动架构和实时数据管道。从 Kafka 3.3 开始引入了 KRaft 模式,不再依赖 ZooKeeper,大幅简化了部署和运维。本文将在搬瓦工 VPS 上使用 KRaft 模式搭建 Kafka。
一、安装 Kafka(KRaft 模式)
1.1 安装 Java 运行时
apt update
apt install openjdk-17-jre-headless -y
java -version
1.2 下载并安装 Kafka
cd /opt
wget https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz
tar -xzf kafka_2.13-3.7.0.tgz
ln -s kafka_2.13-3.7.0 kafka
# 创建数据目录
mkdir -p /data/kafka-logs
1.3 生成集群 ID 并格式化
cd /opt/kafka
KAFKA_CLUSTER_ID=$(/opt/kafka/bin/kafka-storage.sh random-uuid)
echo $KAFKA_CLUSTER_ID
/opt/kafka/bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c /opt/kafka/config/kraft/server.properties
1.4 配置 KRaft 模式
编辑 /opt/kafka/config/kraft/server.properties:
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@localhost:9093
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
advertised.listeners=PLAINTEXT://your-server-ip:9092
controller.listener.names=CONTROLLER
log.dirs=/data/kafka-logs
# 性能调优
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
# 日志保留策略
log.retention.hours=168
log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
# 分区默认数
num.partitions=3
default.replication.factor=1
1.5 启动 Kafka
# 前台启动(测试用)
/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/kraft/server.properties
# 后台启动
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/kraft/server.properties
创建 systemd 服务文件 /etc/systemd/system/kafka.service:
[Unit]
Description=Apache Kafka
After=network.target
[Service]
Type=simple
User=root
Environment="KAFKA_HEAP_OPTS=-Xmx512m -Xms512m"
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/kraft/server.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
Restart=on-failure
RestartSec=10
[Install]
WantedBy=multi-user.target
systemctl daemon-reload
systemctl enable kafka
systemctl start kafka
二、Docker Compose 部署
更简单的部署方式,需要先安装 Docker:
version: '3.8'
services:
kafka:
image: apache/kafka:3.7.0
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://your-server-ip:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LOG_DIRS: /var/lib/kafka/data
KAFKA_NUM_PARTITIONS: 3
KAFKA_LOG_RETENTION_HOURS: 168
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
volumes:
- kafka_data:/var/lib/kafka/data
restart: unless-stopped
volumes:
kafka_data:
三、Topic 管理
# 创建 Topic
/opt/kafka/bin/kafka-topics.sh --create --topic orders --partitions 3 --replication-factor 1 --bootstrap-server localhost:9092
# 查看 Topic 列表
/opt/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092
# 查看 Topic 详情
/opt/kafka/bin/kafka-topics.sh --describe --topic orders --bootstrap-server localhost:9092
# 修改分区数(只能增加不能减少)
/opt/kafka/bin/kafka-topics.sh --alter --topic orders --partitions 6 --bootstrap-server localhost:9092
# 删除 Topic
/opt/kafka/bin/kafka-topics.sh --delete --topic orders --bootstrap-server localhost:9092
四、生产者与消费者
4.1 命令行测试
# 启动控制台生产者
/opt/kafka/bin/kafka-console-producer.sh --topic orders --bootstrap-server localhost:9092
# 启动控制台消费者(从最新消息开始)
/opt/kafka/bin/kafka-console-consumer.sh --topic orders --bootstrap-server localhost:9092
# 从头开始消费
/opt/kafka/bin/kafka-console-consumer.sh --topic orders --from-beginning --bootstrap-server localhost:9092
# 指定消费者组
/opt/kafka/bin/kafka-console-consumer.sh --topic orders --group order-service --bootstrap-server localhost:9092
4.2 Python 客户端
# 安装 kafka-python
pip install kafka-python
# 生产者
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
for i in range(100):
producer.send('orders', {'order_id': i, 'amount': 99.99})
producer.flush()
print("Messages sent")
# 消费者
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'orders',
bootstrap_servers=['localhost:9092'],
group_id='order-processor',
auto_offset_reset='earliest',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
for message in consumer:
print(f"Partition: {message.partition}, Offset: {message.offset}, Value: {message.value}")
五、消费者组管理
# 查看消费者组列表
/opt/kafka/bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
# 查看消费者组详情(包括 lag)
/opt/kafka/bin/kafka-consumer-groups.sh --describe --group order-service --bootstrap-server localhost:9092
# 重置消费位移
/opt/kafka/bin/kafka-consumer-groups.sh --group order-service --topic orders --reset-offsets --to-earliest --execute --bootstrap-server localhost:9092
六、性能调优
# JVM 堆内存(搬瓦工 1GB 方案建议 384m,2GB 方案建议 768m)
export KAFKA_HEAP_OPTS="-Xmx512m -Xms512m"
# 生产者批量配置
batch.size=16384
linger.ms=5
compression.type=lz4
buffer.memory=33554432
# 消费者配置
fetch.min.bytes=1024
fetch.max.wait.ms=500
max.poll.records=500
七、监控
# 查看 Broker 日志
tail -f /opt/kafka/logs/server.log
# 查看 Topic 消息总量
/opt/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic orders --bootstrap-server localhost:9092
# JMX 监控(启用 JMX)
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=9999 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"
八、常见问题
- 内存不足:Kafka 默认 JVM 堆为 1GB。搬瓦工低配方案需要调低 KAFKA_HEAP_OPTS。
- 磁盘写满:调整 log.retention.hours 和 log.retention.bytes 控制数据保留。
- 消费者 Lag 持续增大:增加分区数和消费者实例,或优化消息处理逻辑。
- 消息顺序:Kafka 只保证分区内有序。需要全局有序时使用单分区或相同 key 路由到同一分区。
总结
Apache Kafka 是处理大规模实时数据流的首选平台。KRaft 模式去除了 ZooKeeper 依赖,让单节点部署变得简单。在搬瓦工 VPS 上适合用于日志收集、事件驱动和微服务通信。如果你的消息量较小,RabbitMQ 或 Redis Stream 可能更合适。选购搬瓦工 VPS 请查看 全部方案,使用优惠码 NODESEEK2026 享受 6.77% 折扣,通过 bwh81.net 进入官网。