当前位置: 首页 > news >正文

Kafka与MySQL的组合使用

  1. 根据上面给出的student表,编写Python程序完成如下操作:

(1)读取student表的数据内容,将其转为JSON格式,发送给Kafka;

创建Student表的SQL语句如下:

create table student(
sno char(5),
sname char(10),
ssex char(2),
sage int
);

向student表中插入两条记录的SQL语句如下:

insert into student values(‘95001’,’John’,’M’,23);
insert into student values(‘95002’,’Tom’,’M’,23);

 启动zookeeper和kafka的服务

编写一个生产者程序mysql_producer.py:

from kafka import KafkaProducer
import json
import pymysql.cursorsproducer = KafkaProducer(bootstrap_servers='localhost:9092',value_serializer=lambda v:json.dumps(v).encode('utf-8'))connect=pymysql.Connect(host='localhost',port=3306,user='root',passwd='123456',db='zhangna',charset='utf8'
)
cursor=connect.cursor()
sql="select sno,sname,ssex,sage from student;"
cursor.execute(sql)
data=cursor.fetchall()
connect.commit()for message in data:zn={}zn['sno']=message[0]zn['sname']=message[1]zn['sex']=message[2]zn['age']=message[3]producer.send('mysql_topic',zn)connect.close()
producer.close()

(2)再从Kafka中获取到JSON格式数据,打印出来;

编写一个消费者程序mysql_consumer.py:

from kafka import KafkaConsumer
import json
import pymysql.cursorsconsumer = KafkaConsumer('mysql_topic',bootstrap_servers=['localhost:9092'],group_id=None,auto_offset_reset='earliest')
for msg in consumer:msg1=str(msg.value,encoding="utf-8")data=json.loads(msg1)print(data)

终于出来了,出错的原因是encoding,我写成了encodings的缘故

为什么我会出现两条重复记录,原因是我生产者程序运行了多次,生产者多运行一次,消费者程序就会多一次查询

http://www.lryc.cn/news/204930.html

相关文章:

  • 2018年亚太杯APMCM数学建模大赛A题老年人平衡能力的实时训练模型求解全过程文档及程序
  • 华盛顿特区选举委员会:黑客可能已侵入整个选民名册
  • kali安装nodejs、npm失败
  • 插入排序(学习笔记)
  • wps excel js编程
  • Python 类继承解释
  • Reactor反应器模式
  • alibaba.fastjson的使用(六) -- JavaBean==》Json字符串、JSONObject、JSONArray
  • uniapp 自定义导航栏
  • 查分小程序:一键查询成绩,班主任和家长的得力助手
  • Linux内核驱动开发的步骤
  • 【Java 进阶篇】HTML DOM 事件详解
  • redis 从小白到大师系列
  • vue使用.filter方法检索数组中指定时间段内的数据
  • Ubuntu 安装 npm 和 node
  • Matlab论文插图绘制模板第122期—函数折线图(fplot)
  • IK分词器如何修改支持跨版本ES
  • Spring MVC常用十大注解
  • 二、【MyBatis】 MyBatis入门与简单使用
  • 基于DF模式的协作通信技术matlab性能仿真
  • Angular-01:基本架构
  • 字符串划分
  • ImportError: /lib64/libstdc++.so.6: version `CXXABI_1.3.9‘ not found的解决方法
  • 华为云全新上线Serverless应用中心,支持一键构建文生图应用
  • scrapy的安装和使用
  • Kotlin中的异常处理
  • [Ubuntu 18.04] 搭建文件夹共享之Samba服务器
  • 设计模式-装饰器模式
  • 【Vs code调试C++】
  • 使用 C++创建一个简易的数据库管理系统 (DBMS)