当前位置: 首页 > news >正文

【Project】基于kafka的高可用分布式日志监控与告警系统

底层学习笔记

kafka数据存储

在这里插入图片描述

kafka最高水位线机制

在这里插入图片描述

Kraft模式

在这里插入图片描述

zooker和kraft

在这里插入图片描述

系统部署与使用说明

本系统主要包含 Kafka、Filebeat、Nginx(搭配 Keepalived)以及 Python 消费者程序的配置与部署,用于处理和存储 Nginx 访问日志。以下是详细的配置、部署和使用说明。

1. Kafka 配置与启动

1.1 配置文件

Kafka 采用 KRaft 模式,涉及三个节点(kafka01、kafka02、kafka03),各节点配置文件如下:

  • kafka01/opt/kafka_2.13-3.6.1/config/kraft/server.properties
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@192.168.100.150:9093,2@192.168.100.151:9093,3@192.168.100.152:9093
listeners=PLAINTEXT://kafka01:9092,CONTROLLER://kafka01:9093
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://kafka01:9092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/var/lib/kafka
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
  • kafka02/opt/kafka_2.13-3.6.1/config/kraft/server.properties
process.roles=broker,controller
node.id=2
controller.quorum.voters=1@192.168.100.150:9093,2@192.168.100.151:9093,3@192.168.100.152:9093
listeners=PLAINTEXT://kafka02:9092,CONTROLLER://kafka02:9093
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://kafka02:9092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/var/lib/kafka
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
  • kafka03/opt/kafka_2.13-3.6.1/config/kraft/server.properties
process.roles=broker,controller
node.id=3
controller.quorum.voters=1@192.168.100.150:9093,2@192.168.100.151:9093,3@192.168.100.152:9093
listeners=PLAINTEXT://kafka03:9092,CONTROLLER://kafka03:9093
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://kafka03:9092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/var/lib/kafka
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000

1.2 启动与关闭命令

# 启动 Kafka
/opt/kafka_2.13-3.6.1/bin/kafka-server-start.sh -daemon /opt/kafka_2.13-3.6.1/config/kraft/server.properties
# 关闭 Kafka(可选)
#/opt/kafka_2.13-3.6.1/bin/kafka-server-stop.sh

1.3 格式化存储

kafka-storage.sh format  --cluster-id BlQPRL0HSdaaMwvx_dVcAQ  --config /opt/kafka_2.13-3.6.1/config/kraft/server.properties --node-id 1

2. Filebeat 配置

Filebeat 用于收集 Nginx 访问日志并发送到 Kafka,配置文件如下:

filebeat.inputs:
- type: logenabled: truepaths:- /usr/local/nginx1/logs/access.log
output.kafka:hosts: ["kafka01:9092","kafka02:9092","kafka03:9092"]topic: nginxlogkeep_alive: 10s

3. Nginx 配置

3.1 Keepalived 配置

! Configuration File for keepalived
global_defs {router_id LVS_DEVEL
}
vrrp_script chk_nginx {script "/etc/keepalived/check_nginx.sh"interval 10weight -5fall 2rise 1
}
vrrp_instance VI_1 {state MASTERinterface ens33mcast_src_ip 192.168.100.150virtual_router_id 51priority 100advert_int 2authentication {auth_type PASSauth_pass BLUEKING_NGINX_HA}virtual_ipaddress {192.168.100.100/24}track_script {chk_nginx}
}

3.2 检查脚本

#!/bin/bash
process_name="nginx"
process_abs_path="/usr/local/nginx1/sbin/nginx"function is_nginx_running() {process_info=$(ps --no-header -C $process_name -o ppid,pid,args | awk '{printf $1 "|" $2 "|" ; for (i=3; i<=NF; i++) { printf "%s ", $i };printf "\n"}' | grep master)if [[ -z "$process_info" ]]; thenreturn 1elseprocess_pids=($(echo "$process_info" | awk -F'|' '{print $2}'))for _pid in "${process_pids[@]}"; doabs_path=$(readlink -f /proc/$_pid/exe)if [ "$abs_path" == "$(readlink -f "$process_abs_path")" ]; thenreturn 0fidonereturn 2fi
}err=0
for k in $(seq 1 3); dois_nginx_runningif [[ $? != "0" ]]; thenerr=$((err + 1))sleep 1elseerr=0breakfi
doneif [[ $err != "0" ]]; thenexit 1  # 仅返回失败状态,不停止 keepalived
elseexit 0
fi

4. Python 程序

4.1 依赖安装

pip install kafka-python==2.0.2 pymongo==4.13.0 mysql-connector-python==9.3.0 redis==4.5.5 kombu==5.3.1 celery==5.3.1

4.2 代码

https://github.com/Lenoud/nginx-monitor-kafka-python

5. 注意事项

  • 确保各节点的网络连通性,特别是 Kafka 节点之间以及与 MySQL 数据库的连接。
  • 检查各配置文件中的 IP 地址、端口号、用户名和密码等信息是否正确。
  • 在启动 Kafka 之前,确保已完成存储格式化操作。
  • 若需要修改日志存储策略或 Kafka 配置,可根据实际需求调整相应的配置文件。

完成整个系统的部署和配置,实现 Nginx 访问日志的收集、传输和存储。

http://www.lryc.cn/news/580301.html

相关文章:

  • C#扩展方法全解析:给现有类型插上翅膀的魔法
  • CMake基础:条件判断详解
  • 探索 Ubuntu 上 MongoDB 的安装过程
  • [Cyclone] 哈希算法 | SIMD优化哈希计算 | 大数运算 (Int类)
  • 【大模型】到底什么是Function Calling和MCP,以及和ReAct推理的关系是什么?
  • 若 VSCode 添加到文件夹内右键菜单中显示
  • 03_性能优化:让软件呼吸更顺畅
  • ABB焊接机器人智能节气仪
  • App爬虫工具篇-appium配置
  • AWS WebRTC:通过shell分析viewer端日志文件
  • 查看linux中steam游戏的兼容性
  • 权电阻网络DAC实现电压输出型数模转换Multisim电路仿真——硬件工程师笔记
  • C++构造和折构函数详解,超详细!
  • Linux基本命令篇 —— uname命令
  • 第二章-AIGC入门-开启AIGC音频探索之旅:从入门到实践(6/36)
  • 利用 AI 打造的开发者工具集合
  • 一个简单的分布式追踪系统
  • 指针篇(7)- 指针运算笔试题(阿里巴巴)
  • 物联网软件层面的核心技术体系
  • 论文解读:《DeepGray:基于灰度图像和深度学习的恶意软件分类方法》
  • 优象光流模块,基于python的数据读取demo
  • 新能源汽车功率级测试自动化方案:从理论到实践的革命性突破
  • 区块链技术在物联网(IoT)中的核心应用场景
  • SQL Server 进阶语法实战:从动态透视到存储过程的深度应用(第四课)
  • 高档宠物食品对宠物的健康益处有哪些?
  • 【C语言刷题】第十天:加量加餐继续,代码题训练,融会贯通IO模式
  • Webpack构建工具
  • Qt创建线程的方法
  • 学习开发之hashmap
  • RabbitMQ 高级特性之死信队列