Tkinter开发采用串口通信的上位机软件(3)

时间:2024-04-16 10:51:44

本博客的所有原创文章采用创作公用版协议。要求署名、非商业用途和保持一致。要求署名必须包含我的网名(geokai)以及文章来源(选择博客首地址或者具体博文地址)。

商业性使用须预先征得本人同意(发送Email到 geokai@126.com)

 

18年下半年太忙了,直接停止软件的开发计划。在18年最后几天使用python自带的Tkinter框架简单的先把软件功能实现出来了。占时把这一期的标题改成Tkinter开发上位机软件。

先说一下软件实现的功能把

1)获取二氧化碳传感器探头的数据,使用到pyserial,crcmod库

2)使用matplolib进行实时绘图,使用到matplotlib库

3)定时将数据回传到邮箱,使用到email,smtplib库

 

这里只放出最核心部分的代码

导入关键的库

#导入数值GUI框架
import tkinter as tk
from tkinter import scrolledtext 

#导入绘图包
from matplotlib.backends.backend_tkagg import (
    FigureCanvasTkAgg, NavigationToolbar2Tk)
# Implement the default Matplotlib key bindings.
#from matplotlib.backend_bases import key_press_handler
from matplotlib.figure import Figure
import matplotlib.dates as mdates

#导入数学计算包
import pandas as pd
import numpy as np

#导入系统包
import threading
import time
from datetime import datetime,timedelta
import serial.tools.list_ports 
import crcmod

#导入网络包,邮件发送
import smtplib  
from email.mime.multipart import MIMEMultipart 
from email.mime.text import MIMEText  
from email.mime.image import MIMEImage 
from email.header import Header 

 

GUI框架,其中包括绘图部分

#定义GUI界面及功能
class Application(tk.Tk):

    def __init__(self):
        \'\'\'初始化\'\'\'
     
        self.createWidgets()

    def createWidgets(self):
        \'\'\'设置绘图区\'\'\'
        self.fig = Figure(figsize=(10,5), dpi=100)
        self.ax_co2 = self.fig.add_subplot(1,1,1)
        self.canvas = FigureCanvasTkAgg(self.fig, master=self)
        self.canvas.get_tk_widget()
        self.canvas._tkcanvas.place(x=0, y=0, width=1024, height=350)#pack(side=tk.TOP, fill=tk.BOTH, expand=1)



        \'\'\'设置二氧化碳的数据接收区\'\'\'
        self.log_co2 = scrolledtext.ScrolledText(self, font=("Calibri", 8), background=\'#ffffff\')
        self.log_co2.place(x=720, y=450, width=300, height=60)   
        self.log_co2.insert(tk.END,\'Strat\r\n\')

        self.log_co2_neat = scrolledtext.ScrolledText(self, font=("Calibri", 10), background=\'#ffffff\')
        self.log_co2_neat.place(x=720, y=530, width=300, height=100)   
        self.log_co2_neat.insert(tk.END,\'Strat\r\n\')

        self.log_co2_col_name=tk.Label(bg=\'gray\', font=("Calibri", 10),  justify=tk.LEFT, anchor=tk.W, 
            text=\'Time\t\tCO2(ppm)\')
        self.log_co2_col_name.place(x=720, y=510, width=300, height=20)



        
        \'\'\'设置按钮区\'\'\'    
        self.bt_connect_str = tk.StringVar()
        if self.trans_data_status==False:
            self.bt_connect_str.set(\'开始传输数据\')
        else:
            self.bt_connect_str.set(\'停止传输数据\')
        self.bt_connect = tk.Button(self, textvariable=self.bt_connect_str, command=self.ActivateTrans)
        self.bt_connect.pack(side=tk.LEFT , anchor=tk.S)  
        self.Draw() # 绘图


        \'\'\'设置状态区\'\'\'
        self.lb_port_co2_status=tk.Label(bg=\'red\', width=10, height=1, text=\'CO2 Status\')
        self.lb_port_co2_status.pack(side=tk.RIGHT , anchor=tk.S)
        

        

    def ActivateTrans(self):
        \'\'\'
        点击数据传输按钮后激活数据传输
        1)激活一个从串口获取二氧化碳数据的线程
        3)激活邮件发送线程
        4)运行以上三个线程,并判断是否正确连接串口,并显示串口连接状态
        ***注意以上三个线程的功能较为复杂,使用了单独的Thread类进行了继承,因此停止线程
            采用定义在类里面的Stop()方法
        5)激活文本数据刷新线程
        6)激活绘图区刷新线程
        ***以上两个线程仅有单独的函数并且封装在窗体类下,直接采用Threading类进行定义,所
            以需要注意停止需采用threading.Event()方法进行停止
        \'\'\'
        if self.trans_data_status==False:
            self.trans_thread_co2 = Thread_CO2(0, "Thread_CO2_1")
            self.trans_thread_co2.setDaemon(True)
            self.trans_thread_co2.start()


            if self.trans_thread_co2.port_available==True:
                self.lb_port_co2_status.config(bg=\'green\')
                print(\'CO2 Port Failed to Connect\')


            
            self.email_thread_event = threading.Event()
            self.email_thread = threading.Thread(target = SendEmail, args=(EMAIL_RESEND_INTERVAL, self.email_thread_event))
            self.email_thread.start()

            self.refresh_thread_event = threading.Event()
            self.refresh_thread = threading.Thread(target = self.RefreshThread, args=(1, self.refresh_thread_event))
            self.refresh_thread.start()

            self.redraw_thread_event = threading.Event()
            self.redraw_thread = threading.Thread(target = self.ReDrawThread, args=(10, self.redraw_thread_event))
            self.redraw_thread.start()

            self.trans_data_status=True
            self.bt_connect_str.set(\'停止传输数据\')
        else:
            self.trans_thread_co2.Stop()
            self.lb_port_co2_status.config(bg=\'red\')
            self.lb_port_hg_status.config(bg=\'red\')
            self.trans_data_status=False
            self.bt_connect_str.set(\'开始传输数据\')

            self.email_thread_event.set()
            self.refresh_thread_event.set()
            self.redraw_thread_event.set()
            # self.email_thread.join(0)

    def RefreshThread(self, time_interval, stop_event):
        \'\'\'
        原始数据刷新程序
        \'\'\'
        while(not stop_event.is_set() ):
            print(\'refresh\')
            self.RefreshDataText()
            #pinrt(time_interval)
            time.sleep(time_interval)
    
    def RefreshDataText(self):
        \'\'\'
        判断是否有新的数据并显示在文本框中
        
        \'\'\'
        text = self.log_co2.get(0.0, tk.END).splitlines()
        # print(raw_trans_data_co2)
        if len(raw_trans_data_co2)>0 :
            if raw_trans_data_co2[-1]==text[-2]:
                pass
            else:
                self.log_co2.insert(tk.END, raw_trans_data_co2[-1]+\'\r\n\')
        self.log_co2.see(tk.END)
        #print(np.array(compiled_data_co2.iloc[-1]))

        if len(compiled_data_co2)>0:
            one_data = np.array(compiled_data_co2.iloc[-1])
            #print(one_data)
            one_data = str(one_data[0]) + \'\t\t\' + str(one_data[1])       
            text = self.log_co2_neat.get(0.0, tk.END).splitlines()
            if one_data==text[-2]:
                print(\'same\')
            else:
                self.log_co2_neat.insert(tk.END, one_data + \'\r\n\')
            self.log_co2_neat.see(tk.END)



    def AdjustScale(self,_):
        \'\'\'
        调整绘图区坐标轴范围
        \'\'\'

    def ReDrawThread(self, time_interval, stop_event):
        \'\'\'
        绘图区刷新程序
        \'\'\'
        while(not stop_event.is_set()):
            try:
                self.Draw()
            except:
                pass
            time.sleep(time_interval)


    def Draw(self):
        \'\'\'
        实时绘图程序
        TODO:
        1)个人认为使用matplotlib的这种绘图方式效率有些底下,是否采用诸如Animation的
            动态绘图功能改善绘图性能有待检验
        \'\'\'
        #判断是否有有效数据
        if len(compiled_data_co2)>0 or len(compiled_data_hg)>0:               
            #由于二氧化碳数据量太大,选择最后16000条数据,足够保证最大3天的显示量,降低绘图负担
            #注意原始数据中时间数据最好经过to_datetime函数规整一遍,以免造成数据错误
            co2_x_data = pd.to_datetime(compiled_data_co2.iloc[-16000:,0])
            co2_y_data = compiled_data_co2.iloc[-16000:,1]
            # co2_xlim_min = datetime.strptime(co2_x_data.iloc[-1], \'%Y-%m-%d %H:%M:%S\') - self.fig_xlim_delta
            co2_xlim_min = co2_x_data.iloc[-1] - self.fig_xlim_delta
            print( co2_y_data.min())

            self.ax_co2.clear()
            self.ax_co2.xaxis.set_major_formatter(mdates.DateFormatter(\'%m-%d\n%H:%M\'))
            self.ax_co2.xaxis.set_major_locator(mdates.AutoDateLocator())
            self.ax_co2.scatter(co2_x_data.values,co2_y_data.values, s = 1, c=\'green\')
            self.ax_co2.set_xlim(co2_xlim_min,co2_x_data.iloc[-1]+ self.fig_xlim_delta/9)
            self.ax_co2.set_ylim(co2_y_data.min(), co2_y_data.min()+self.fig_co2_ylim_delta)
            self.ax_co2.set_ylabel(\'$CO_2(ppm)$\')
            self.ax_co2.grid(linestyle=\'--\')
            
          
            self.fig.savefig(\'D:/figure.png\')


            self.canvas.draw()

    
    def _quit(self):
        \'\'\'退出\'\'\'
        if self.trans_data_status==True:
            self.ActivateTrans()    
            
        self.quit()     # 停止 mainloop
        self.destroy()  # 销毁所有部件

  

二氧化碳数据的传输

class Thread_CO2 (threading.Thread):
    \'\'\'
    接收CO2数据的线程
    该CO2探头的购买链接https://m.tb.cn/h.3phrPcr

    \'\'\'
    def __init__(self, threadID, name):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.port_available = False

        com_list = serial.tools.list_ports.comports()
        for port in com_list:
            print(port.device) #返回端口号如COM3
            print(port.description) #返回设备名字
            print(port.pid) #返回设备在计算机上的位置         
            if port.pid==29987:
                port_num_co2=port.device
                self.port_available = True

        self.ser_co2=serial.Serial()
        if self.port_available==True:
            self.ser_co2.port=port_num_co2
            self.ser_co2.baudrate=19200
            self.ser_co2.parity=serial.PARITY_EVEN
            self.ser_co2.timeout=0.5

        self.__runing_flag=True

    def run(self):
        print ("开始线程:" + self.name)
        if self.port_available==False:
            if DEBUG_MODE==True:
                while(self.__runing_flag):
                    self.FakeData()
                    time.sleep(1)
            return 1
        if not self.ser_co2.is_open:
            self.ser_co2.open()
        print(self.ser_co2)

        while(self.__runing_flag):
            self.GetData()
            time.sleep(CO2_REFRESH_INTERVAL)
        
    def GetData(self):
        \'\'\'
        获取CO2数据需要注意进制的转换,以及最终的CRC16校验
        CRC的校验使用CRCMOD库,不同类型的CRC均可以采用此库进行计算
        其中特别要关注poly这个参数,参考http://www.ip33.com/crc.html
        在该网站查询CRC多项式,并在开头补1
        \'\'\'
        retry_time= 10
        while(retry_time):
            request_code_co2=[]
            #首先配置需要发送的信息,Serial库接收直接以0-255的int值
            #因此需要将16进制字符串转换为10进制整数
            for i in \'15 04 13 8B 00 01 46 70 \'.split():
                request_code_co2.append(int(i,16))
            # print(request_code_co2)
            if not self.ser_co2.is_open:
                self.ser_co2.open()
            self.ser_co2.write(request_code_co2)
            #获取的格式为b\'\',byte型
            temp =self.ser_co2.readline()
            #print(temp)

            #定义CRC,并计算CRC
            crc16_func = crcmod.Crc(poly=0x18005, rev=True, initCrc=0xFFFF, xorOut=0x0000)
            crc16_func.update(temp[:-2])
            
            co2_conc=[]

            #CRC的计算结果为hex型,采用bytes.fromhex()转换为byte再与传输的最后两位byte对比,注意顺序
            #如果获取成功就退出,没有成功则重复,最多10次
            #TODO:如果多词未获取成功,未来需要加入一个错误信息日志
            if bytes.fromhex(crc16_func.hexdigest()) == temp[-2:][::-1]:
                print(temp[-4:-2])
                co2_conc = (int.from_bytes(temp[-4:-2], byteorder=\'big\', signed=False))
                break
            retry_time-=1
        
        #raw_trans_data_co2用来显示文本信息,需要将DateTime和获取的16进制值转换成str类型,否则文本框无法显示
        #TODO:这个语句应该可以优化
        co2_one_data_raw = str([temp.hex()[x*2:x*2+2] for x in range(len(temp.hex())//2) ])
        co2_one_data_time = (datetime.now().strftime(\'%Y-%m-%d %H:%M:%S\'))
        raw_trans_data_co2.append(str(co2_one_data_time) + co2_one_data_raw)
        try:
            if co2_conc>=0:
                compiled_data_co2.loc[len(compiled_data_co2)] = [co2_one_data_time, co2_conc]
        except:
            pass
        with open(\'D:/raw_data_CO2.txt\', mode=\'a\') as f:
            f.write(str(co2_one_data_time))
            f.write(\'\t\')
            f.write(str(co2_conc))
            f.write(\'\r\n\')
            
        #print(compiled_data_co2)
    
    def FakeData(self):
        #产生伪数据


    def Stop(self):
        if self.ser_co2.is_open and self.port_available==True:
            self.ser_co2.close()
        print(self.ser_co2)
        self.__runing_flag=False