当前位置: 首页 > news >正文

Python客户端使用SASL_SSL连接Kafka需要将jks密钥转换为pem密钥,需要转化成p12格式再转换pem才能适配confluent_kafka包

证书生成

生成证书以及jks参考以下文章
https://blog.csdn.net/qq_41527073/article/details/121148600

证书转换jks -> pem

需要转化成p12以下转换才能适配confluent_kafka包,直接jks转pem会报错不能使用,具体参考以下文章
https://www.ngui.cc/zz/1104321.html?action=onClick
在这里插入图片描述
keytool -importkeystore -srckeystore server.truststore.jks -destkeystore server.p12 -deststoretype PKCS12
openssl pkcs12 -in server.p12 -nokeys -out server.cer.pem
keytool -importkeystore -srckeystore server.keystore.jks -destkeystore client.p12 -deststoretype PKCS12
openssl pkcs12 -in client.p12 -nokeys -out client.cer.pem
openssl pkcs12 -in client.p12 -nodes -nocerts -out client.key.pem
(转换过程)
生成证书目录结构:
在这里插入图片描述

python客户端示例代码

import rsa, json
import time, sys
from kafka import KafkaProducer
import confluent_kafka
import ssl
def encrypt(msg):with open('public.pem', 'rb') as publickfile:p = publickfile.read()pubkey = rsa.PublicKey.load_pkcs1(p)original_text = msg.encode('utf8')crypt_text = rsa.encrypt(original_text, pubkey)return crypt_textdef decrypt(data):with open('private.pem', 'rb') as privatefile:p = privatefile.read()privkey = rsa.PrivateKey.load_pkcs1(p)crypt_text = dataoriginal_text = rsa.decrypt(crypt_text, privkey)return original_text.decode('utf8')def produce_message():producer: KafkaProducer = Nonesuccess = 0conn_error = 0msg = {"type": "webclone","api": "delete","state": True,"nodename": "node-1","uuid": "asjdkjrh"}context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)context.set_ciphers('TLSv1:TLSv1.2')context.check_hostname = Falsecontext.verify_mode = ssl.CERT_NONEwhile success < 5:try:producer.send('message_push', value=json.dumps(msg))producer.flush()success += 1except KeyboardInterrupt:breakexcept:while True:try:producer = KafkaProducer(bootstrap_servers=['172.XX.X.XX:9093'],acks = 1,security_protocol="SASL_SSL",ssl_cafile="/Kafka/config/ssl/server.cer.pem",ssl_certfile="/Kafka/config/ssl/client.cer.pem",ssl_keyfile="/Kafka/config/ssl/client.key.pem",ssl_context=context,sasl_mechanism="PLAIN",sasl_plain_username="kafka",sasl_plain_password="XXXXX",api_version = (2, 0),)breakexcept Exception as e:producer = Noneprint(e)conn_error += 1time.sleep(1)print(f"connect error: {conn_error}")if __name__ == '__main__':if len(sys.argv) < 2:print("Usage: python3 test.py produce|consume")sys.exit(1)start_time = time.time()if sys.argv[1] == 'produce':produce_message()elif sys.argv[1] == 'consume':consume_message()end_time = time.time()print(f"start at: {start_time}, end at: {end_time}, cost: {end_time - start_time} seconds")
http://www.lryc.cn/news/9259.html

相关文章:

  • JDK8 ConcurrentHashMap源码分析
  • 前置知识-初值问题、欧拉法、改进欧拉法
  • 睡眠影响寿命,这几个睡眠习惯赶紧改掉!
  • Linux逻辑卷管理器(PV、VG、LV、PE)
  • Centos7 内核升级
  • SpringBoot 启动配置文件加载和参数配置修改问题
  • 布隆过滤器和布谷鸟过滤器详解
  • WebGIS前端框架(openlayers,mapbox,leaflet)图形图像底层渲染原理分析
  • AcWing语法基础课笔记 第五章 C++中的字符串
  • 抓包工具Charles(一)-下载安装与设置
  • SpringBoot09:Swagger
  • Git 常用命令
  • 查看jdk安装路径,在windows上实现多个java jdk的共存解决办法,安装java19后终端乱码的解决
  • 链表数据结构
  • 汽车DTC故障内码与标准故障码的解析与转换
  • 零基础学习测试还是开发?
  • 如何加入new bing候补名单
  • 中国天气——西风带环流和寒潮
  • 2022黑马Redis跟学笔记.实战篇(四)
  • Allegro中如何删除多余D码操作指导
  • 学生投票系统-课后程序(JAVA基础案例教程-黑马程序员编著-第三章-课后作业)
  • 初始化一个列表python
  • 【electron】webview嵌入页面发送消息给父级页面
  • Whids:一款针对Windows操作系统的开源EDR
  • 初级调色转档CameraRaw
  • Mybatis源码(3) - Executor执行过程 | 一级缓存 | 二级缓存
  • 成为 Seatunnel 源码贡献者保姆级教程
  • MySQL的索引视图练习题
  • 【C++ Primer Plus】第四章:复合类型
  • 做外贸,你不能不懂的外贸流程知识