太极安全监控系统0.8
完善后的代码及功能详细介绍
完善后的代码
python
import os
import sys
import subprocess
import re
import datetime
import threading
import tkinter as tk
from tkinter import messagebox, simpledialog, ttk
import scapy.all as scapy
import whois
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.svm import OneClassSVM
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout
import json
import random
import socket
import pickle
from geopy.geocoders import Nominatim
from collections import defaultdict
import Evtx.Evtx as evtx
import tensorflow as tf
import psutil
import logging
import time
import multiprocessing
from PyQt5.QtWidgets import QApplication, QMainWindow, QTabWidget, QWidget, QVBoxLayout, QPushButton, QListWidget, QLabel, QTreeView, QFileSystemModel, QTableWidget, QTableWidgetItem, QComboBox, QProgressBar
from PyQt5.QtCore import Qt, QThread, pyqtSignal
from PyQt5.QtGui import QPixmap, QImage
import matplotlib.pyplot as plt
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas# 配置日志
logging.basicConfig(filename=os.path.join('logs', 'security_system.log'), level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')# 确保 TensorFlow 使用 GPU
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:try:tf.config.experimental.set_memory_growth(gpus[0], True)except RuntimeError as e:print(e)logging.error(f"TensorFlow GPU configuration error: {e}")# 全局变量
suspicious_activities = []
packets = []
geolocator_cache = {}
whois_cache = {}
taiji_shield = None# 动态获取网络接口名称
def get_network_interfaces():return scapy.get_if_list()# 配置防火墙规则
def configure_firewall():print("配置防火墙规则...")logging.info("配置防火墙规则...")# Windows 防火墙规则配置示例subprocess.run(["netsh", "advfirewall", "set", "currentprofile", "state", "on"])# 阻断已知恶意 IP 地址known_malicious_ips = ["192.168.1.100", "10.0.0.1"]for ip in known_malicious_ips:subprocess.run(["netsh", "advfirewall", "firewall", "add", "rule", "name=BlockMaliciousIP", "dir=in", "action=block", "remoteip=" + ip])subprocess.run(["netsh", "advfirewall", "firewall", "add", "rule", "name=BlockMaliciousIP", "dir=out", "action=block", "remoteip=" + ip])# 读取和解析系统日志
def analyze_logs(log_file):print(f"分析日志文件 {log_file}...")logging.info(f"分析日志文件 {log_file}...")suspicious_activities = []try:with evtx.Evtx(log_file) as log:for record in log.records():xml = record.xml()if "IPTables-Input" in xml or "IPTables-Output" in xml or "Security-Audit" in xml or "Security-Event" in xml:match = re.search(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', xml)if match:ip_address = match.group(1)timestamp = re.search(r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}', xml)if timestamp:timestamp = timestamp.group(0)suspicious_activities.append((timestamp, ip_address, xml.strip()))except Exception as e:print(f"分析日志文件时发生错误: {e}")logging.error(f"分析日志文件时发生错误: {e}")return suspicious_activities# 使用 Scapy 抓取特定端口的流量
def capture_traffic(interface, port):print(f"抓取 {interface} 上的 {port} 端口流量...")logging.info(f"抓取 {interface} 上的 {port} 端口流量...")packets = scapy.sniff(iface=interface, filter=f"port {port}", count=100)return packets# 获取入侵者地理位置
def get_geolocation(ip_address, geolocator_cache):if ip_address in geolocator_cache:return geolocator_cache[ip_address]try:geolocator = Nominatim(user_agent="security_system")location = geolocator.geocode(ip_address)if location:geolocator_cache[ip_address] = f"{location.city}, {location.country}"return geolocator_cache[ip_address]else:geolocator_cache[ip_address] = "未知位置"return "未知位置"except Exception as e:geolocator_cache[ip_address] = f"获取地理位置失败: {str(e)}"logging.error(f"获取地理位置失败: {e}")return geolocator_cache[ip_address]# 验证 IP 地址
def verify_ip(ip_address, whois_cache):if ip_address in whois_cache:return whois_cache[ip_address]try:w = whois.whois(ip_address)if w and w.get('nets'):whois_cache[ip_address]