当前位置: 首页 > news >正文

python脚本:向kafka数据库中插入测试数据

# coding:utf-8
import datetime
import json
import random
import timefrom kafka import KafkaProducer'''生产者demo向branch-event主题中循环写入10条json数据注意事项:要写入json数据需加上value_serializer参数,如下代码
'''
producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'),security_protocol='SASL_PLAINTEXT',sasl_mechanism='PLAIN',sasl_plain_username='kafkadmin',sasl_plain_password='xxxxxxxx',bootstrap_servers=['10.10.xx.xx:9092']  # 数据监测与分析-测试环境)def gen(i):""" 生成当前日期和时间戳 """# print(time.localtime(time.time()))time_stamp = str(round(time.time() * 1000) - 0)time_stamp_format = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]# time_stamp_format = "2023-09-12 10:00:06.361"# date = time.strftime('%Y.%m.%d', time.localtime(time.time()))current_date = datetime.datetime.now().strftime('%Y.%m.%d')dst_ip = str(random.randint(1, 254)) + '.' + \str(random.randint(1, 254)) + '.' + \str(random.randint(1, 254)) + '.' + \str(random.randint(1, 254))src_ip = str(random.randint(1, 254)) + '.' + \str(random.randint(1, 254)) + '.' + \str(random.randint(1, 254)) + '.' + \str(random.randint(1, 254))model_name = f"zhang-{current_date}.cc2{i}"""" 原始日志 """s = {#"create_time": time_stamp,# "dst_device_ip": "2101:db7:0:0:0:0:0:f","dst_device_ip": "777.77.77.77",#201.20.180.188# "dst_device_ip": dst_ip,#"dst_device_ip": "172.16.22.22",# "src_device_ip": "8.44.8.44","src_device_ip": "777.77.77.77",           # 名单10.10.10.10# "src_device_ip": "201.20.180.180",             # 内网IP# "src_device_ip": src_ip,             # 安全域IP"src_device_dept": " ","eqpt_asset_type": "/IDS/Network/WAF","app_protocol": f"abc{i}","alarm_times": "9","start_time": f"{time_stamp_format}","src_account": model_name + "." + str(i),"answer_address": "光谷创新港","alarm_direction": "xxgcn","additional_name": "www.jw.com","http_url_externalurl": f"http://www.{model_name}.com","http_url_externalurl_domain": f"www.{model_name}.com",# 测试告警扩展字段"response_action": {"alertRestrainAccordingCols": "","sinkCols": "group_array(src_device_ip) as src_ip,group_array(src_device_uuid) as src_device_uuid,group_array(dst_device_ip) as dst_ip,group_array(dst_device_uuid) as dst_device_uuid,group_array(src_device_ip_country) as src_country,group_array(src_device_ip_province) as src_province,group_array(src_device_ip_city) as src_city,group_array(src_port) as src_port,group_array(dst_device_ip_country) as dst_country,group_array(dst_device_ip_province) as dst_province,group_array(dst_device_ip_city) as dst_city,group_array(dst_port) as dst_port,group_array(http_url_externalurl_domain) as dst_domain,group_array(http_url_externalurl) as dst_url,group_array(protocol) as agreement,uuid as uuids,first(A.start_time) as strategy_alert_first_time,first(A.start_time) as strategy_alert_last_time","sinkStaticInfo": "[{\"strategy_alert_name\":\"名单过滤-实时告警6.9\"},{\"strategy_att_ck\":\"侦察-搜集主机信息\"},{\"strategy_alert_desc\":\"\"},{\"strategy_risk_score\":5},{\"strategy_alert_category\":\"告警分类\"},{\"strategy_alert_summary\":\"\"}]","sinkType": "each"},"response_code": "standby2-jwwwwwwwwwwwww","response_data": f"TotoLink 多款路由器downloadFlilecgi命执行漏洞(CVE-2022-25075--CVE-2022-25083)_{i}",# "dst_app_url": "http://brandsnap.org/5d65105/1661e","file_hash": "36426c221bfa23180805d78c8421b653","branch_code": "xxgcn","external_alarm_attack": "漏洞 恶意域名 XRed","external_alarm_attack_type": "bbaccb","attack_ip": "211.211.211.211","log_type": "uum","result_action": "用户静态密码错误","eqpt_vendor": f"idss_{i}","src_port": f"111{i}","dst_port": f"53",# "src_network_domain": f"源IP网络域-test{i}",# "dst_network_domain": f"目的IP网络域-test{i}""src_network_domain": None,"dst_network_domain": None,"object_type": "公共服务","src_device_vendor": "联通","dst_device_type": "服务器","src_person": "zhangxingheng","dst_person":"xuqq","dst_person_name":"zhangxingheng","dst_person_status":"在职","dst_person_ctpositionname":"测试","dst_person_types":"企业员工","dst_person_org_name":"技术部",}# s = json.loads(s)print('打印插入数据:',s)# producer.send('ioc_topic', s)# producer.send('gsp-alarm', p)producer.send('zhang_orglog', s)#kafka数据库表名,没有自动创建# producer.send('argus_gsp', p)if __name__ == '__main__':for i in range(2):gen(i)time.sleep(1)  # 等待1秒,防止时间戳相同producer.close()#print('代码全部运行完成')

在这里插入图片描述

http://www.lryc.cn/news/502474.html

相关文章:

  • 10. 高效利用Excel导入报警信息
  • k8s service 配置AWS nlb load_balancing.cross_zone.enabled
  • 国标GB28181网页直播平台EasyGBS国标GB28181-2016协议解读:媒体流保活机制
  • 面试经验分享 | 杭州某安全大厂渗透测试岗
  • 26. Three.js案例-自定义多面体
  • HarmonyOS-高级(四)
  • Qt-chart 画折线图(以时间为x轴)
  • 【入门】晶晶的补习班
  • c#动态更新替换json节点
  • cf补题日记
  • Golang学习笔记_01——包
  • RPC设计--应用层缓冲区,TcpBuffer
  • 基于单片机智能控制的饮水机控制系统
  • 路径规划 | 改进的人工势场法APF算法进行路径规划(Matlab)
  • 【云原生知识】Kubernets实践-前端服务如何访问后端服务
  • 【ubuntu18.04】ubuntu18.04安装EasyCwmp操作说明
  • 使用Jackson库的ObjectMapper类将JSON字符串转换为Java的Map对象
  • ASP.NET Core实现鉴权授权的几个库
  • MySql:数据类型
  • Couchbase的OLAP支持情况
  • 企业级包管理器之搭建 npm 私有服务器 (6)
  • Elasticsearch的一些介绍
  • 音乐网站设计与实现
  • UE5 蓝图节点中文化
  • java抽奖系统登录下(四)
  • 解决阿里云轻量级服务器 Ubuntu 24.04.1 LTS 没网也 ping 不通 8.8.8.8 以及 route -n 没有输出任何转发信息
  • 跨域问题及其解决方案
  • 在CentOS中安装和卸载mysql
  • React简单入门 - [Next.js项目] - 页面跳转、AntD组件、二级目录等
  • Redis安装和Python练习(Windows11 + Python3.X + Pycharm社区版)