Kafka 流处理平台搭建教程

Apache Kafka 是一个分布式流处理平台,以其超高的吞吐量和水平扩展能力著称,广泛用于日志收集、事件驱动架构和实时数据管道。从 Kafka 3.3 开始引入了 KRaft 模式,不再依赖 ZooKeeper,大幅简化了部署和运维。本文将在搬瓦工 VPS 上使用 KRaft 模式搭建 Kafka。

一、安装 Kafka(KRaft 模式)

1.1 安装 Java 运行时

apt update
apt install openjdk-17-jre-headless -y
java -version

1.2 下载并安装 Kafka

cd /opt
wget https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz
tar -xzf kafka_2.13-3.7.0.tgz
ln -s kafka_2.13-3.7.0 kafka

# 创建数据目录
mkdir -p /data/kafka-logs

1.3 生成集群 ID 并格式化

cd /opt/kafka
KAFKA_CLUSTER_ID=$(/opt/kafka/bin/kafka-storage.sh random-uuid)
echo $KAFKA_CLUSTER_ID

/opt/kafka/bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c /opt/kafka/config/kraft/server.properties

1.4 配置 KRaft 模式

编辑 /opt/kafka/config/kraft/server.properties

process.roles=broker,controller
node.id=1
controller.quorum.voters=1@localhost:9093
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
advertised.listeners=PLAINTEXT://your-server-ip:9092
controller.listener.names=CONTROLLER
log.dirs=/data/kafka-logs

# 性能调优
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

# 日志保留策略
log.retention.hours=168
log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000

# 分区默认数
num.partitions=3
default.replication.factor=1

1.5 启动 Kafka

# 前台启动(测试用)
/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/kraft/server.properties

# 后台启动
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/kraft/server.properties

创建 systemd 服务文件 /etc/systemd/system/kafka.service

[Unit]
Description=Apache Kafka
After=network.target

[Service]
Type=simple
User=root
Environment="KAFKA_HEAP_OPTS=-Xmx512m -Xms512m"
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/kraft/server.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
Restart=on-failure
RestartSec=10

[Install]
WantedBy=multi-user.target
systemctl daemon-reload
systemctl enable kafka
systemctl start kafka

二、Docker Compose 部署

更简单的部署方式,需要先安装 Docker

version: '3.8'
services:
  kafka:
    image: apache/kafka:3.7.0
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://your-server-ip:9092
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LOG_DIRS: /var/lib/kafka/data
      KAFKA_NUM_PARTITIONS: 3
      KAFKA_LOG_RETENTION_HOURS: 168
      CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
    volumes:
      - kafka_data:/var/lib/kafka/data
    restart: unless-stopped

volumes:
  kafka_data:

三、Topic 管理

# 创建 Topic
/opt/kafka/bin/kafka-topics.sh --create --topic orders --partitions 3 --replication-factor 1 --bootstrap-server localhost:9092

# 查看 Topic 列表
/opt/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092

# 查看 Topic 详情
/opt/kafka/bin/kafka-topics.sh --describe --topic orders --bootstrap-server localhost:9092

# 修改分区数(只能增加不能减少)
/opt/kafka/bin/kafka-topics.sh --alter --topic orders --partitions 6 --bootstrap-server localhost:9092

# 删除 Topic
/opt/kafka/bin/kafka-topics.sh --delete --topic orders --bootstrap-server localhost:9092

四、生产者与消费者

4.1 命令行测试

# 启动控制台生产者
/opt/kafka/bin/kafka-console-producer.sh --topic orders --bootstrap-server localhost:9092

# 启动控制台消费者(从最新消息开始)
/opt/kafka/bin/kafka-console-consumer.sh --topic orders --bootstrap-server localhost:9092

# 从头开始消费
/opt/kafka/bin/kafka-console-consumer.sh --topic orders --from-beginning --bootstrap-server localhost:9092

# 指定消费者组
/opt/kafka/bin/kafka-console-consumer.sh --topic orders --group order-service --bootstrap-server localhost:9092

4.2 Python 客户端

# 安装 kafka-python
pip install kafka-python

# 生产者
from kafka import KafkaProducer
import json

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

for i in range(100):
    producer.send('orders', {'order_id': i, 'amount': 99.99})
producer.flush()
print("Messages sent")

# 消费者
from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'orders',
    bootstrap_servers=['localhost:9092'],
    group_id='order-processor',
    auto_offset_reset='earliest',
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

for message in consumer:
    print(f"Partition: {message.partition}, Offset: {message.offset}, Value: {message.value}")

五、消费者组管理

# 查看消费者组列表
/opt/kafka/bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092

# 查看消费者组详情(包括 lag)
/opt/kafka/bin/kafka-consumer-groups.sh --describe --group order-service --bootstrap-server localhost:9092

# 重置消费位移
/opt/kafka/bin/kafka-consumer-groups.sh --group order-service --topic orders --reset-offsets --to-earliest --execute --bootstrap-server localhost:9092

六、性能调优

# JVM 堆内存(搬瓦工 1GB 方案建议 384m,2GB 方案建议 768m)
export KAFKA_HEAP_OPTS="-Xmx512m -Xms512m"

# 生产者批量配置
batch.size=16384
linger.ms=5
compression.type=lz4
buffer.memory=33554432

# 消费者配置
fetch.min.bytes=1024
fetch.max.wait.ms=500
max.poll.records=500

七、监控

# 查看 Broker 日志
tail -f /opt/kafka/logs/server.log

# 查看 Topic 消息总量
/opt/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic orders --bootstrap-server localhost:9092

# JMX 监控(启用 JMX)
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=9999 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"

八、常见问题

  • 内存不足:Kafka 默认 JVM 堆为 1GB。搬瓦工低配方案需要调低 KAFKA_HEAP_OPTS。
  • 磁盘写满:调整 log.retention.hours 和 log.retention.bytes 控制数据保留。
  • 消费者 Lag 持续增大:增加分区数和消费者实例,或优化消息处理逻辑。
  • 消息顺序:Kafka 只保证分区内有序。需要全局有序时使用单分区或相同 key 路由到同一分区。

总结

Apache Kafka 是处理大规模实时数据流的首选平台。KRaft 模式去除了 ZooKeeper 依赖,让单节点部署变得简单。在搬瓦工 VPS 上适合用于日志收集、事件驱动和微服务通信。如果你的消息量较小,RabbitMQRedis Stream 可能更合适。选购搬瓦工 VPS 请查看 全部方案,使用优惠码 NODESEEK2026 享受 6.77% 折扣,通过 bwh81.net 进入官网。

关于本站

搬瓦工VPS中文网(bwgvps.com)是非官方中文信息站,整理搬瓦工的方案、优惠和教程。我们不销售主机,不提供技术服务。

新手必读
搬瓦工优惠码

NODESEEK2026(优惠 6.77%)

购买时填入即可抵扣。