【Project】基于kafka的高可用分布式日志监控与告警系统
底层学习笔记
kafka数据存储
kafka最高水位线机制
Kraft模式
zooker和kraft
系统部署与使用说明
本系统主要包含 Kafka、Filebeat、Nginx(搭配 Keepalived)以及 Python 消费者程序的配置与部署,用于处理和存储 Nginx 访问日志。以下是详细的配置、部署和使用说明。
1. Kafka 配置与启动
1.1 配置文件
Kafka 采用 KRaft 模式,涉及三个节点(kafka01、kafka02、kafka03),各节点配置文件如下:
- kafka01:
/opt/kafka_2.13-3.6.1/config/kraft/server.properties
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@192.168.100.150:9093,2@192.168.100.151:9093,3@192.168.100.152:9093
listeners=PLAINTEXT://kafka01:9092,CONTROLLER://kafka01:9093
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://kafka01:9092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/var/lib/kafka
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
- kafka02:
/opt/kafka_2.13-3.6.1/config/kraft/server.properties
process.roles=broker,controller
node.id=2
controller.quorum.voters=1@192.168.100.150:9093,2@192.168.100.151:9093,3@192.168.100.152:9093
listeners=PLAINTEXT://kafka02:9092,CONTROLLER://kafka02:9093
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://kafka02:9092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/var/lib/kafka
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
- kafka03:
/opt/kafka_2.13-3.6.1/config/kraft/server.properties
process.roles=broker,controller
node.id=3
controller.quorum.voters=1@192.168.100.150:9093,2@192.168.100.151:9093,3@192.168.100.152:9093
listeners=PLAINTEXT://kafka03:9092,CONTROLLER://kafka03:9093
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://kafka03:9092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/var/lib/kafka
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
1.2 启动与关闭命令
# 启动 Kafka
/opt/kafka_2.13-3.6.1/bin/kafka-server-start.sh -daemon /opt/kafka_2.13-3.6.1/config/kraft/server.properties
# 关闭 Kafka(可选)
#/opt/kafka_2.13-3.6.1/bin/kafka-server-stop.sh
1.3 格式化存储
kafka-storage.sh format --cluster-id BlQPRL0HSdaaMwvx_dVcAQ --config /opt/kafka_2.13-3.6.1/config/kraft/server.properties --node-id 1
2. Filebeat 配置
Filebeat 用于收集 Nginx 访问日志并发送到 Kafka,配置文件如下:
filebeat.inputs:
- type: logenabled: truepaths:- /usr/local/nginx1/logs/access.log
output.kafka:hosts: ["kafka01:9092","kafka02:9092","kafka03:9092"]topic: nginxlogkeep_alive: 10s
3. Nginx 配置
3.1 Keepalived 配置
! Configuration File for keepalived
global_defs {router_id LVS_DEVEL
}
vrrp_script chk_nginx {script "/etc/keepalived/check_nginx.sh"interval 10weight -5fall 2rise 1
}
vrrp_instance VI_1 {state MASTERinterface ens33mcast_src_ip 192.168.100.150virtual_router_id 51priority 100advert_int 2authentication {auth_type PASSauth_pass BLUEKING_NGINX_HA}virtual_ipaddress {192.168.100.100/24}track_script {chk_nginx}
}
3.2 检查脚本
#!/bin/bash
process_name="nginx"
process_abs_path="/usr/local/nginx1/sbin/nginx"function is_nginx_running() {process_info=$(ps --no-header -C $process_name -o ppid,pid,args | awk '{printf $1 "|" $2 "|" ; for (i=3; i<=NF; i++) { printf "%s ", $i };printf "\n"}' | grep master)if [[ -z "$process_info" ]]; thenreturn 1elseprocess_pids=($(echo "$process_info" | awk -F'|' '{print $2}'))for _pid in "${process_pids[@]}"; doabs_path=$(readlink -f /proc/$_pid/exe)if [ "$abs_path" == "$(readlink -f "$process_abs_path")" ]; thenreturn 0fidonereturn 2fi
}err=0
for k in $(seq 1 3); dois_nginx_runningif [[ $? != "0" ]]; thenerr=$((err + 1))sleep 1elseerr=0breakfi
doneif [[ $err != "0" ]]; thenexit 1 # 仅返回失败状态,不停止 keepalived
elseexit 0
fi
4. Python 程序
4.1 依赖安装
pip install kafka-python==2.0.2 pymongo==4.13.0 mysql-connector-python==9.3.0 redis==4.5.5 kombu==5.3.1 celery==5.3.1
4.2 代码
https://github.com/Lenoud/nginx-monitor-kafka-python
5. 注意事项
- 确保各节点的网络连通性,特别是 Kafka 节点之间以及与 MySQL 数据库的连接。
- 检查各配置文件中的 IP 地址、端口号、用户名和密码等信息是否正确。
- 在启动 Kafka 之前,确保已完成存储格式化操作。
- 若需要修改日志存储策略或 Kafka 配置,可根据实际需求调整相应的配置文件。
完成整个系统的部署和配置,实现 Nginx 访问日志的收集、传输和存储。