java服务线程泄露临时解决脚本
最近线上出了个问题,因为线程泄露导致的(正常服务的线程都在200以内),至于是如何泄露的,目前还不知道,解决办法就是每十分钟跑一次,当检测到节点上面某个服务的线程大于300的时候,就去重启对应的pod
import paramiko
import getpass
import socket
import re
import json
import subprocess# 配置部分
SERVERS = ["192.168.0.27", "192.168.0.94", "192.168.0.240","192.168.0.115", "192.168.0.234","192.168.0.222", "192.168.0.167","192.168.0.141", "192.168.0.248","192.168.0.41", "192.168.0.28","192.168.0.34", "192.168.0.15","192.168.0.51", "192.168.0.241","192.168.0.46", "192.168.0.29",] # 替换为实际服务器IP/主机名
USERNAME = "root" # 替换为SSH用户名
PASSWORD = '123456'
THREAD_THRESHOLD = 300 # 线程数阈值
BLACKLIST = ["systemd", "kthreadd", "ksoftirqd", "migration", "rcu_bh", "rcu_sched"] # 系统进程黑名单def get_pod_uid(ssh_client, pid):"""从进程cgroup信息中提取Pod UID"""try:# 获取进程的cgroup信息stdin, stdout, stderr = ssh_client.exec_command(f"cat /proc/{pid}/cgroup")cgroup_info = stdout.read().decode()# 从cgroup信息中提取Pod UIDpod_uid_match = re.search(r"pod([a-f0-9\-]+)", cgroup_info)return pod_uid_match.group(1) if pod_uid_match else Noneexcept Exception as e:print(f" 获取进程 {pid} 的cgroup信息时出错: {str(e)}")return Nonedef get_high_thread_processes(ssh_client):"""获取线程数超过阈值的进程及其对应的Pod UID"""high_thread_pods = {}try:# 执行远程命令获取/proc下所有进程目录stdin, stdout, stderr = ssh_client.exec_command("ls -d /proc/[0-9]*")process_dirs = [line.split('/')[-1] for line in stdout.read().decode().splitlines() if line.strip()]print(f" 扫描 {len(process_dirs)} 个进程...")for pid in process_dirs:try:# 读取进程状态文件stdin, stdout, stderr = ssh_client.exec_command(f"cat /proc/{pid}/status")status_data = stdout.read().decode()if not status_data:continue# 解析进程名和线程数pname, threads = None, 0for line in status_data.splitlines():if line.startswith("Name:"):pname = line.split('\t')[1]elif line.startswith("Threads:"):threads = int(line.split('\t')[1])break# 检查线程数和黑名单if threads > THREAD_THRESHOLD and pname and pname not in BLACKLIST:print(f" 发现高线程进程: PID={pid}, 进程={pname}, 线程数={threads}")# 获取Pod UIDpod_uid = get_pod_uid(ssh_client, pid)if pod_uid:if pod_uid not in high_thread_pods:high_thread_pods[pod_uid] = {"processes": []}high_thread_pods[pod_uid]["processes"].append({"pid": pid,"pname": pname,"threads": threads})except Exception as e:print(f" 检查进程 {pid} 时出错: {str(e)}")continueexcept Exception as e:print(f" 扫描进程时出错: {str(e)}")return high_thread_podsdef get_pod_info(pod_uid):"""在本地使用kubectl获取Pod信息"""try:# 执行kubectl命令获取Pod信息cmd = f"kubectl get pods --all-namespaces -o json"result = subprocess.run(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE,text=True)if result.returncode != 0:print(f" ❌ 获取Pod信息失败: {result.stderr}")return None, None# 解析JSON输出pods_data = json.loads(result.stdout)# 查找匹配的Podfor pod in pods_data['items']:if pod['metadata']['uid'] == pod_uid:namespace = pod['metadata']['namespace']pod_name = pod['metadata']['name']print(f" 找到Pod: {namespace}/{pod_name} (UID: {pod_uid})")return namespace, pod_nameprint(f" ❗ 未找到UID为 {pod_uid} 的Pod")return None, Noneexcept Exception as e:print(f" ❌ 获取Pod信息时出错: {str(e)}")return None, Nonedef restart_pod(namespace, pod_name):"""重启指定的Kubernetes Pod"""try:print(f" 尝试重启Pod: {namespace}/{pod_name}")# 执行kubectl命令删除Pod(Kubernetes会自动重建)cmd = f"kubectl delete pod {pod_name} -n {namespace} --grace-period=0 --force"result = subprocess.run(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE,text=True)if result.returncode == 0:print(f" ✅ Pod重启成功: {namespace}/{pod_name}")return Trueelse:print(f" ❌ Pod重启失败: {namespace}/{pod_name}")print(f" 错误信息: {result.stderr}")return Falseexcept Exception as e:print(f" ❌ 执行kubectl命令出错: {str(e)}")return Falsedef main():# 记录所有需要重启的Pod UID(去重)pod_uids_to_restart = set()# 存储Pod详细信息缓存pod_info_cache = {}# 存储进程信息用于报告process_details = {}# 记录连接状态connection_status = {"success": 0,"failed": 0,"failed_servers": []}# 遍历所有服务器for server in SERVERS:print(f"\n===== 正在检查服务器: {server} =====")ssh = paramiko.SSHClient()ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())try:# 连接服务器(使用密码认证)ssh.connect(hostname=server, username=USERNAME, password=PASSWORD, timeout=10)print(f"✅ 连接成功")connection_status["success"] += 1# 获取高线程进程及其Pod UIDhigh_thread_pods = get_high_thread_processes(ssh)if not high_thread_pods:print(" 未找到线程数超过阈值的Pod进程")continueprint(f" 发现 {len(high_thread_pods)} 个需要关注的Pod")# 收集需要重启的Pod UIDfor pod_uid, pod_data in high_thread_pods.items():pod_uids_to_restart.add(pod_uid)# 保存进程信息用于报告if pod_uid not in process_details:process_details[pod_uid] = []process_details[pod_uid].extend(pod_data["processes"])except paramiko.AuthenticationException:print("❌ 认证失败: 用户名或密码错误")connection_status["failed"] += 1connection_status["failed_servers"].append(f"{server} (认证失败)")except (socket.timeout, paramiko.SSHException) as e:print(f"❌ 连接失败: {str(e)}")connection_status["failed"] += 1connection_status["failed_servers"].append(f"{server} ({str(e)})")except Exception as e:print(f"❌ 未知错误: {str(e)}")connection_status["failed"] += 1connection_status["failed_servers"].append(f"{server} ({str(e)})")finally:try:if ssh.get_transport() and ssh.get_transport().is_active():ssh.close()except:pass# 输出连接状态摘要print("\n===== 连接状态摘要 =====")print(f"成功连接服务器: {connection_status['success']}/{len(SERVERS)}")if connection_status["failed"] > 0:print(f"连接失败服务器: {connection_status['failed']}")for server in connection_status["failed_servers"]:print(f" - {server}")# 获取Pod信息并重启if pod_uids_to_restart:print("\n===== 获取Pod信息并准备重启 =====")for pod_uid in pod_uids_to_restart:# 从缓存中获取或查询Pod信息if pod_uid not in pod_info_cache:namespace, pod_name = get_pod_info(pod_uid)if namespace and pod_name:pod_info_cache[pod_uid] = (namespace, pod_name)else:continuenamespace, pod_name = pod_info_cache[pod_uid]# 打印详细信息print(f"\nPod: {namespace}/{pod_name} (UID: {pod_uid})")print(f"包含高线程进程:")for proc in process_details.get(pod_uid, []):print(f" - PID: {proc['pid']}, 进程: {proc['pname']}, 线程数: {proc['threads']}")# 重启Podrestart_pod(namespace, pod_name)else:if connection_status["success"] > 0:print("\n没有需要重启的Pod")else:print("\n⚠️ 没有成功连接任何服务器,无法检查Pod状态")if __name__ == "__main__":main()