当前位置: 首页 > news >正文

9、监测数据采集物联网应用开发步骤(7)

监测数据采集物联网应用开发步骤(6)

串口(COM)通讯开发

本章节测试使用了 Configure Virtual Serial Port Driver虚拟串口工具和本人自写的串口调试工具,请自行baidu下载对应工具

com.zxy.common.Com_Para.py中添加如下内容

#RS232串口通讯列表 串口号,波特率,数据位,索引(A,B,C,D区分),多串口分割符;
ComPortList = ""  #linux参考:/dev/ttyS0,9600,8,0,A;/dev/ttyS1.9600,8,0,B windwows参考:COM1,9600,8,0;COM2,9600,8,2
#串口通讯全局变量hashtable <String, seria>串口索引---串口对象
htComPort = {}

 在com.zxy.main.Init_Page.py中添加如下内容

    @staticmethoddef Start_ComPort():iIndex = 0for temComPort in Com_Para.ComPortList.split(";"):iIndex = iIndex + 1temComPortInfo = temComPort.split(",")   try:if len(temComPortInfo) == 5 and Com_Fun.GetHashTableNone(Com_Para.htComPort, temComPortInfo[4]) is None:temCD = ComDev(temComPortInfo[0], int(temComPortInfo[1]), int(temComPortInfo[2]), int(temComPortInfo[3]), iIndex)temCD.attPortName = temComPortInfo[4]Com_Fun.SetHashTable(Com_Para.htComPort, temComPortInfo[4], temCD)except Exception as e:print("com link error:COM"+temComPortInfo[0]+"==>"  + repr(e)+"=>"+str(e.__traceback__.tb_lineno))finally:Pass

创建串口设备管理类com.zxy.comport.ComDev.py

#! python3
# -*- coding: utf-8 -
'''
Created on 2017年05月10日
@author: zxyong 13738196011
'''import datetime,threading,time,serial
from com.zxy.common.Com_Fun import Com_Fun
from com.zxy.adminlog.UsAdmin_Log import UsAdmin_Log
from com.zxy.common import Com_Para
from com.zxy.z_debug import z_debug#监测数据采集物联网应用--串口设备管理
class ComDev(z_debug):    attIndex    =   0attPort     =   0attBaudrate =   9600attBytesize =   8attSerial   =   None#超时时间(秒) 为了验证测试效果,将时间设置为10秒attTimeout  =   10#返回值attReturnValue  = NoneattPortName     = ""#特殊插件处理attProtocol     = ""#回发数据attSendValue    = None#线程锁attLock = threading.Lock()def __init__(self, inputPort,inputBaudrate,inputBytesize,inputparity,inputIndex):self.attPort = inputPortself.attBaudrate = inputBaudrateself.attBytesize = inputBytesizetemParity =  "N"if str(inputparity) == "0":   #无校验temParity =  "N"elif str(inputparity) == "1": #偶校验temParity =  "E"elif str(inputparity) == "2": #奇校验temParity =  "O"elif str(inputparity) == "3":temParity =  "M"elif str(inputparity) == "4":temParity =  "S"self.attSerial = serial.Serial(port=self.attPort,baudrate=self.attBaudrate,bytesize=self.attBytesize,parity=temParity, stopbits=1)self.attSerial.timeout = self.attTimeoutself.attIndex = inputIndexself.OpenSeriaPort()#打开串口def OpenSeriaPort(self):try: if not self.attSerial.isOpen():  self.attSerial.open()t = threading.Thread(target=self.OnDataReceived, name="ComPortTh" + str(self.attIndex))t.start()uL = UsAdmin_Log(Com_Para.ApplicationPath,str("ComPortTh" + str(self.attIndex)))uL.SaveFileDaySub("thread")      print("Open ComPortTh" + str(self.attIndex)+" COM:"+str(self.attSerial.port)+" "+Com_Fun.GetTimeDef()+" lenThreads:"+str(len(threading.enumerate())))return Trueexcept Exception as e:if str(type(self)) == "<class 'type'>":self.debug_in(self,repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息else:self.debug_in(repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息return Falsefinally:pass#关闭串口def CloseSeriaPort(self):try: if not self.attSerial.isOpen():  self.attSerial.close()return Trueexcept Exception as e:if str(type(self)) == "<class 'type'>":self.debug_in(self,repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息else:self.debug_in(repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息return Falsefinally:pass#发送数据无返回 def WritePortDataImmed(self,inputByte):try: if not self.attSerial.isOpen():  self.OpenSeriaPort()if self.attSerial.isOpen() and self.attLock.acquire():                    self.attReturnValue = NonetemNumber = self.attSerial.write(inputByte)time.sleep(0.2)self.attLock.release()return temNumberelse:return 0except Exception as e:if str(type(self)) == "<class 'type'>":self.debug_in(self,repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息else:self.debug_in(repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息return -1#返回值为字节,带结束符 def WritePortDataFlag(self,inputByte,EndFlag):try: if not self.attSerial.isOpen():  self.OpenSeriaPort()if self.attSerial.isOpen() and self.attLock.acquire():                    self.attReturnValue = NonetemNumber = self.attSerial.write(inputByte)    starttime = datetime.datetime.now()    endtime = datetime.datetime.now() + datetime.timedelta(seconds=self.attTimeout)while (self.attReturnValue is None or self.attReturnValue[len(self.attReturnValue) - len(EndFlag):len(self.attReturnValue)] != EndFlag.encode(Com_Para.U_CODE)) and starttime <= endtime:starttime = datetime.datetime.now()time.sleep(0.2)                self.attLock.release()return temNumberexcept Exception as e:if str(type(self)) == "<class 'type'>":self.debug_in(self,repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息else:self.debug_in(repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息return -1finally:pass#返回值为字节 def WritePortData(self,inputByte):try: if not self.attSerial.isOpen():  self.OpenSeriaPort()if self.attSerial.isOpen() and self.attLock.acquire():                    self.attReturnValue = NonetemNumber = self.attSerial.write(inputByte)    starttime = datetime.datetime.now()    endtime = datetime.datetime.now() + datetime.timedelta(seconds=self.attTimeout)while self.attReturnValue is None and starttime <= endtime:starttime = datetime.datetime.now()time.sleep(0.2)                self.attLock.release()return temNumberexcept Exception as e:if str(type(self)) == "<class 'type'>":self.debug_in(self,repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息else:self.debug_in(repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息return -1finally:pass#接收数据        def OnDataReceived(self):try:while self.attSerial.isOpen():temNum = self.attSerial.inWaiting()if temNum > 0:if self.attReturnValue is None:self.attReturnValue = self.attSerial.read(temNum)else:self.attReturnValue = self.attReturnValue + self.attSerial.read(temNum)else:time.sleep(1)except Exception as e:if str(type(self)) == "<class 'type'>":self.debug_in(self, repr(e)+"=>"+str(e.__traceback__.tb_lineno))else:self.debug_in(repr(e)+"=>"+str(e.__traceback__.tb_lineno))self.attReturnValue = None

串口通讯测试案例MonitorDataCmd.py主文件中编写:

在该语句下添加

       #串口配置参数Com_Para.ComPortList = "COM2,9600,8,0,A;COM4,9600,8,2,B"#串口连接初始化Init_Page.Start_ComPort()#测试串口数据发送和接收temCP2 = Com_Fun.GetHashTable(Com_Para.htComPort,"A")#获取串口2对象temCP4 = Com_Fun.GetHashTable(Com_Para.htComPort,"B")#获取串口4对象temByte1 = ("AABBCCDDVV").encode(Com_Para.U_CODE)   #发送字符串转byte[]temByte2 = ("11223344KM").encode(Com_Para.U_CODE)   #发送字符串转byte[]print("开始发送串口数据")temRec1 = temCP2.WritePortData(temByte1)#往串口2发送数据print("串口2发送数据长度:"+str(temRec1))strRec = ""if temCP2.attReturnValue != None:strRec = temCP2.attReturnValue.decode(Com_Para.U_CODE)#收到串口数据print("串口2收到数据值:"+strRec)temRec2 = temCP4.WritePortData(temByte2)#往串口4发送数据print("串口3发送数据长度:"+str(temRec2))strRec = ""if temCP4.attReturnValue != None:strRec = temCP4.attReturnValue.decode(Com_Para.U_CODE)#收到串口数据print("串口4收到数据值:"+strRec)

串口调试测试结果:

http://www.lryc.cn/news/144529.html

相关文章:

  • 微信小程序开发教学系列(9)- 小程序页面优化
  • 如何将储存在Mac或PC端的PDF文件传输到移动设备呢?
  • 一图看懂架构划分原则:技术划分 OR 领域划分?
  • 从零开始的Hadoop学习(二)| Hadoop介绍、优势、组成、HDFS架构
  • 问道管理:逾4600股飘红!汽车板块爆了,多股冲击涨停!
  • Java 语言实现选择排序算法
  • 【C语言每日一题】05. 输出保留12位小数的浮点数
  • 科大讯飞永久免费GPT入口来了!!!
  • 亚马逊反馈和评论的区别
  • Linux 查看当前文件夹下的文件大小
  • 玩转 PI 系列-看起来像服务器的 ARM 开发板矩阵-Firefly Cluster Server
  • 《Flink学习笔记》——第六章 Flink的时间和窗口
  • nano主板扩大swap交换交换空间大小 /windows里远程传输文件/ssh远程登陆访问GUI界面报错
  • 7个用于机器学习和数据科学的基本 Python 库
  • Kafka 简介 + 学习笔记
  • Mybatis小记
  • 如何向BertModel增加字符
  • copilot切换下一条提示的快捷键
  • Mongodb 删除文档Delete与Remove的区别
  • Docker 的基本概念和优势
  • 基于 xhr 实现 axios
  • 基于面向对象的大模型代码生成
  • 易云维®FMCS厂务系统创造工厂全新的“数字低碳智能”应用场景
  • 【Linux应用部署篇】在CSDN云IDE平台部署Etherpad文档编辑器
  • 基于java swing和mysql实现的汽车租赁管理系统(源码+数据库+文档+运行指导视频)
  • Rigs-of-rods安装
  • html学习第2篇---标签(1)
  • 爬虫逆向实战(二十四)--某鸟记录中心
  • 【操作系统】中断和异常
  • 锁策略、原子编程CAS 和 synchronized 优化过程