当前位置: 首页 > article >正文

统计客户端使用情况,使用es存储数据,实现去重以及计数

这篇文件的重点在tshark、filebeat、和logstash。

需求:统计客户使用的客户端版本

实现工具:tshark 1.10.14,filebeat 8.17.0,logstash 8.17.0,elasticsearch 8.17.0,kibana 8.17.0

总体设计:使用tshark抓包,并以格式化的文本形式(字段中间以|分隔)输出到txt文件,txt文件每10M分割一个新文件,通过filebeat采集输出到logstash(由于filebeat功能过于简单,无法实现_id重复时更新操作,以及提取复杂字符串的需求,使用logstash作为中间处理层),最后使用es存储,kibana查看。

1.首先编写脚本实现网络抓取,分割文件,定期清理旧文件的功能,脚本如下

#!/bin/bash# TShark 持续抓包脚本# 配置参数
INTERFACE="any"          # 监听的网卡接口
OUTPUT_DIR="/home/vagrant/tshark"  # 输出目录
FILE_PREFIX="capture"     # 文件名前缀
MAX_FILE_SIZE=1000         # 单个文件最大大小(MB)
MAX_FILE_COUNT=10         # 最大文件数量(超过将删除最旧文件)
CAPTURE_FILTER=""         # 捕获过滤器(如"port 80")
BUFFER_SIZE=20            # 内存缓冲区大小(MB)
SNAPLEN=0                 # 抓包长度(0表示完整数据包)
PROCOTOL=http             # 协议# 创建输出目录
mkdir -p "$OUTPUT_DIR"# 生成带时间戳的文件名
current_file="${OUTPUT_DIR}/${FILE_PREFIX}_$(date +'%Y%m%d_%H%M%S')"# 清理旧文件函数
cleanup_old_files() {echo "[$(date)] 接收到清理信号,开始清理..."# 保留最新的 MAX_FILE_COUNT 个文件ls -t "${OUTPUT_DIR}/${FILE_PREFIX}"_*.txt 2>/dev/null | tail -n +3 | xargs -r rm -f
}# 捕获终止信号
trap 'echo "捕获终止..."; cleanup_old_files; exit 0' INT TERMecho "开始捕获网络流量..."
echo "输出文件: $current_file"# 开始捕获
tshark -i "$INTERFACE" \-T fields  -e ip.src -e ip.dst  -e tcp.dstport -e http.host -e http.user_agent -e http.authorization -e frame.time_epoch -E separator="|" | split -d -b 10M --additional-suffix=.txt - "${OUTPUT_DIR}/${FILE_PREFIX}_$(date +'%Y%m%d_%H%M%S')_" &
TSHARK_PID=$!echo "[$(date)] Tshark已启动 (PID: $TSHARK_PID)"
echo "[$(date)] 开始定期清理,每小时执行一次..."while kill -0 $TSHARK_PID 2>/dev/null; do  # 检查tshark是否仍在运行cleanup_old_filessleep 3600  # 每小时执行一次
doneecho "[$(date)] Tshark已停止,清理脚本退出"

2.使用filebeat采集,主要配置如下:

#提取各字段,删除无用字段,当user_agent为空时,放弃这个事件
processors:- dissect:tokenizer: "%{src_ip}|%{dst_ip}|%{dst_port}|%{host}|%{user_agent}|%{auth}|%{timestamp}"field: "message"- drop_fields:fields: ["ecs", "host", "input", "log", "agent", "dissect.timestamp", "message"]- drop_event:when:equals:dissect.user_agent: ""

3.logstash主要配置如下:

input {beats {port => 5044}
}filter {mutate {replace => { "temp_auth" => "%{[dissect][auth]}" }gsub => ["temp_auth", "OPEN-BODY-SIG ", ""]}#kv提取键值对内容kv {source => "temp_auth"field_split => ","value_split => "="prefix => "auth_"remove_field => "temp_auth"}mutate {rename => { "auth_AppId" => "app_id" }rename => { "auth_ClientId" => "app_id" }}fingerprint {source => ["[dissect][user_agent]", "app_id"]method => "SHA256"  # 也可以使用 MURMUR3, SHA1, MD5 等concatenate_sources => true  # 将多个字段值连接后再哈希target => "[@metadata][_id]"}
}output {elasticsearch {hosts => ["http://xxxxxx:9200"] # 根据你的ES地址修改index => "httppackmsg"document_id => "%{[@metadata][_id]}"action => "update"doc_as_upsert => truescript => "ctx._source.count = (ctx._source.count ?: 0) + 1"script_type => "inline"script_lang => "painless"}# 调试用(可选)stdout {codec => rubydebug}
}

logstash中使用kv提取auth中的键值对内容,并给提取出的key设置prefix,rename将字段修改为自建想要的名字。在fingerprint 中通过将user_agent和appid连接,使用SHA256生成_id.

http://www.lryc.cn/news/2380428.html

相关文章:

  • 代码随想录算法训练营第六十四天| 图论9—卡码网47. 参加科学大会,94. 城市间货物运输 I
  • oracle序列自增问题
  • 开启健康生活的多元养生之道
  • 【Vite】前端开发服务器的配置
  • 鸿蒙OSUniApp 制作自定义弹窗与模态框组件#三方框架 #Uniapp
  • Spring Security与Spring Boot集成原理
  • VScode各文件转化为PDF的方法
  • 精益数据分析(58/126):移情阶段的深度实践与客户访谈方法论
  • Vue3学习(组合式API——Watch侦听器、watchEffect()详解)
  • 【node.js】安装与配置
  • 《AI大模型应知应会100篇》第62篇:TypeChat——类型安全的大模型编程框架
  • HttpMessageConverter 的作用是什么? 它是如何实现请求体到对象、对象到响应体的自动转换的(特别是 JSON/XML)?
  • EdgeShard:通过协作边缘计算实现高效的 LLM 推理
  • 火山 RTC 引擎9 ----集成 appkey
  • ArcGIS Pro 3.4 二次开发 - 框架
  • Adminer:一个基于Web的轻量级数据库管理工具
  • RK3568下QT实现按钮切换tabWidget
  • 2025 OceanBase 开发者大会全议程指南
  • GitHub 趋势日报 (2025年05月15日)
  • day017-磁盘管理-实战
  • 【成品设计】STM32和UCOS-II的项目
  • 当通过PHP在线修改文件数组遇到不能及时生效问题
  • Ngrok 配置:实现 Uniapp 前后端项目内网穿透
  • 鸿蒙ArkUI体验:Hexo博客客户端开发心得
  • 鸿蒙NEXT开发动画案例10
  • Python 的 os 库常见使用方法(操作目录及文件)
  • 【Linux】Linux安装并配置Redis
  • 【11408学习记录】考研英语辞职信写作三步法:真题精讲+妙句活用+范文模板
  • 数据库(一):分布式数据库
  • Java并发编程-线程池(三)