#!/usr/bin/env python
# encoding: utf-8
'''
@author: JHC
@license: None
@contact: JHC000abc@gmail.com
@file: host.py
@time: 2022/5/9 11:08
@desc:检测U盘插入脚本
'''
import datetime
import json
import os
import time
from gui.control import monitor_upper_window
from psutil import disk_partitions
import get_path
import multiprocessing


TIME_WAIT = 3
COMPUTER_VERSION_PATH = R"C:\setup\version.json"

if os.path.exists(get_path.get_path()+R"gui\res\log.txt"):
    print("删除旧的log")
    print(get_path.get_path()+R"gui\res\log.txt")
    os.remove(get_path.get_path()+R"gui\res\log.txt")
else:
    print("不存在旧的log")


class UsbCheck:
    def __init__(self):
        self.flag = 0
        self.mode = 0
        # 版本状态
        self.mysql_flag = 0
        self.redis_flag = 0
        self.server_flag = 0
        self.client_flag = 0
        self.judge_flag = 0
        self.display_flag = 0
        self.major_flag = 0

    def run(self):
        '''
        运行
        :return:
        '''

        while True:
            print("循环检测U盘")
            self.write_log(data="循环检测U盘")
            # 构建当前移动设备列表
            device_list=[i.device for i in disk_partitions() if i.opts=='rw,removable']
            removable_len = len(device_list)
            # 移动设备存在
            if removable_len > 0:
                # print("检测到U盘")
                self.write_log(data="检测到U盘")
                # 控制仅执行一次
                if self.flag==0:
                    self.flag = 1
                    # print('发现usb驱动:', device_list)
                    self.write_log(data="发现usb驱动{}".format(device_list))

                    for i in device_list:
                        if os.path.exists(i.replace("\\","") +R"\\setup\setup\version.json"):
                            uDiskPath = i.replace("\\","")
                        else:
                            print("不存在version文件,非专用升级U盘")
                            self.write_log(data="不存在version文件,非专用升级U盘")
                    try:
                        with open(get_path.get_path() + R"gui\res\set.json", 'w', encoding="utf-8") as fp:
                            fp.write(json.dumps({
                                "uDiskPath": uDiskPath
                            }))
                        print("uDiskPath={}".format(uDiskPath))
                        self.write_log(data="uDiskPath={}".format(uDiskPath))
                        # 弹出窗口
                        # self.show_win()
                        p = multiprocessing.Process(target=self.show_win)
                        p.start()
                    except Exception as e:
                        print(e)

                else:
                    # print("没有新盘插入")
                    self.write_log(data="没有新盘插入")
                    pass
            # 移动设备被拔出
            elif self.flag == 1 and removable_len <= 0:
                # print("移动设备被拔出")
                self.write_log(data="移动设备被拔出")
                self.flag = 0
            else:
                # print("未插入移动设备")
                self.write_log(data="未插入移动设备")
            # 休眠三秒
            time.sleep(3)

    def show_win(self):
        monitor_upper_window.GUI_start()

    def write_log(self,data):
        with open(get_path.get_path()+R"gui\res\log.txt",'a',encoding="utf-8")as fp:
            fp.write(data+"\n")

if __name__ == '__main__':
	# 进程打包必须加,防止进程多开
    multiprocessing.freeze_support()
    uc = UsbCheck()
    uc.run()

```python
#!/usr/bin/env python
# encoding: utf-8
'''
@author: JHC
@license: None
@contact: JHC000abc@gmail.com
@file: monitor_upper_window.py
@time: 2022/5/13 10:39
@desc:只替换exe版本,检测到U盘后比对版本,符合升级条件升级
'''
import json
import os
import sys
import re
import shutil
import subprocess
import threading
import time
from gui.control import mysql_create
import pythoncom
import winshell
from PyQt5.QtWidgets import QWidget, QApplication, QMessageBox
from PyQt5.QtGui import QMovie, QPalette, QPixmap, QBrush, QPaintEvent
from PyQt5.QtCore import Qt, pyqtSignal
from gui.ui import monitor_upper_8
import get_path



COMPUTER_VERSION_PATH = R"C:\setup\version.json"
with open(get_path.get_path()+R"gui\res\set.json","r",encoding="utf-8")as fp:
    USB_VERSION_PATH = json.loads(fp.read())["uDiskPath"]+"\\"


class MonitorUpper(QWidget):
    close_win = pyqtSignal()
    change_status_to_dark = pyqtSignal()
    change_status_to_light = pyqtSignal()
    show_warn = pyqtSignal(str)

    def __init__(self):
        super().__init__()
        self.ui = monitor_upper_8.Ui_Form()
        self.ui.setupUi(self)
        self.setWindowFlags(Qt.FramelessWindowHint)
        self.showFullScreen()
        self.checkBox_server_single = 0
        self.checkBox_client_single = 0
        self.checkBox_major_single = 0
        self.checkBox_judge_single = 0
        self.checkBox_display_single = 0
        # self.checkBox_mysql_single = 0
        # self.checkBox_redis_single = 0
        self.version_single = 0
        self.sin = 0
        self.server_upper_single = False
        self.major_upper_single = False
        self.client_upper_single = False
        self.judge_upper_single = False
        self.display_upper_single = False

        # self.redis_upper_single = False
        # self.mysql_upper_single = False

        self.ui.checkBox_redis.hide()
        self.ui.pushButton_start.hide()

        palette = QPalette()
        pix = QPixmap(get_path.get_path() + R"gui\res\bg_upper.jpg")
        pix = pix.scaled(self.width(), self.height())
        palette.setBrush(QPalette.Background, QBrush(pix))

        gif = QMovie(get_path.get_path() + R"gui\res\gif_upper.gif")
        self.ui.label_gif.setMovie(gif)
        gif.start()

        self.ui.checkBox_server.clicked.connect(self.checkBox_server)
        self.ui.checkBox_client.clicked.connect(self.checkBox_client)
        self.ui.checkBox_major.clicked.connect(self.checkBox_major)
        self.ui.checkBox_judge.clicked.connect(self.checkBox_judge)
        self.ui.checkBox_display.clicked.connect(self.checkBox_display)
        self.ui.pushButton_start.clicked.connect(self.pushButton_start)
        # self.ui.checkBox_mysql.clicked.connect(self.checkBox_mysql)
        # self.ui.checkBox_redis.clicked.connect(self.checkBox_redis)
        self.close_win.connect(self.close_window)
        self.change_status_to_dark.connect(self.change_btn_status_to_dark)
        self.show_warn.connect(self.warning)

    def change_btn_status_to_dark(self):
        # 隐藏按钮
        self.ui.checkBox_server.hide()
        self.ui.checkBox_client.hide()
        self.ui.checkBox_judge.hide()
        self.ui.checkBox_major.hide()
        self.ui.checkBox_display.hide()
        self.ui.checkBox_mysql.hide()
        # self.ui.checkBox_redis.hide()
        # self.ui.pushButton_start.hide()
        # 显示提示文字
        self.ui.label_2.setText("软件升级中……")
        self.ui.label_2.setAlignment(Qt.AlignRight)
        self.ui.label_2.setAlignment(Qt.AlignTop)
        self.ui.label_2.setStyleSheet(
            'font: 36pt "方正小标宋简体";color: rgb(85, 255, 255);'
        )

    def close_window(self):
        try:
            self.close()
            print("界面正常关闭")
            self.write_log(data="界面正常关闭")
        except Exception as e:
            print("界面关闭异常")
            self.write_log(data="界面关闭异常,原因:{}".format(e))

    def checkBox_server(self):
        if self.checkBox_server_single == 0:
            self.checkBox_server_single = 1
        else:
            self.checkBox_server_single = 0

    def checkBox_client(self):
        if self.checkBox_client_single == 0:
            self.checkBox_client_single = 1
        else:
            self.checkBox_client_single = 0

    def checkBox_major(self):
        if self.checkBox_major_single == 0:
            self.checkBox_major_single = 1
        else:
            self.checkBox_major_single = 0

    def checkBox_judge(self):
        if self.checkBox_judge_single == 0:
            self.checkBox_judge_single = 1
        else:
            self.checkBox_judge_single = 0

    def checkBox_display(self):
        if self.checkBox_display_single == 0:
            self.checkBox_display_single = 1
        else:
            self.checkBox_display_single = 0
            
    # def checkBox_mysql(self):
    #     if self.checkBox_mysql_single == 0:
    #         self.checkBox_mysql_single =1
    #     else:
    #         self.checkBox_mysql_single = 0
    #
    # def checkBox_redis(self):
    #     if self.checkBox_redis_single == 0:
    #         self.checkBox_redis_single = 1
    #     else:
    #         self.checkBox_redis_single = 0
    #     print(self.checkBox_display_single)

    # def set_mysql(self):
    #     cmd = USB_VERSION_PATH + R"setup\update_mysql.bat"
    #     self.write_log(data=cmd)
    #     subprocess.Popen(cmd)

    # def set_redis(self):
    #     cmd = USB_VERSION_PATH + R"setup\update_redis.bat"
    #     self.write_log(data=cmd)
    #     os.popen(cmd)

    # def create_shortcut(self, bin_path, name, desc):
    #     pythoncom.CoInitialize()
    #     print("bin_path,name,desc = ",bin_path,name,desc)
    #     try:
    #         shortcut = os.path.join(winshell.desktop(), name + ".lnk")
    #         winshell.CreateShortcut(
    #             Path=shortcut,
    #             Target=bin_path,
    #             Icon=(bin_path, 0),
    #             Description=desc
    #         )
    #         return True
    #     except ImportError as e:
    #         print(e)
    #         self.write_log(data="Well, do nothing as 'winshell' lib may not available on current os")
    #         self.write_log(data="error detail %s" % str(e))
    #         return False

    def copy_new_files(self, from_file, computer_path):
        if not os.path.exists(computer_path):
            os.makedirs(computer_path)
        files = os.listdir(from_file)
        for f in files:
            if os.path.isdir(from_file + '/' + f):
                self.copy_new_files(from_file + '/' + f, computer_path + '/' + f)
            else:
                shutil.copy(from_file + '/' + f, computer_path + '/' + f)

    def shot_down_process(self,process_name):
        cmd = 'taskkill /F /IM ' + process_name
        os.system(cmd)

    def restart(self,name):
        cmd = R"C:\setup\{}\{}.exe".format(name, name)
        subprocess.Popen(cmd)

    def pushButton_start(self):
        try:
            t1 = threading.Thread(target=self.run_start)
            t1.start()
        except Exception as e:
            print(e)

    def run_start(self):
        '''
        点击开始后
        1.检查被勾选的待升级项
        2.检查被勾选的待升级项在U盘中是否存在更高版本的升级包
            判断是不是存在,要判断所有勾选的是否都符合升级条件,
                完全符合 直接隐藏按钮 进行升级
                部分符合 忽略不符合的,隐藏按钮 升级符合部分,升级完成后弹框提示不符合升级条件的
                全不符合 弹框提示
        '''
        # if self.checkBox_mysql_single == 1:
        # # C盘中不存在Mysql文件夹,或该文件夹下无内容,隐藏按钮
        #     if os.path.exists("C:\MySQL")==False or len(os.listdir("C:\MySQL")) <= 0:
        #         self.write_log(data="mysql符合升级条件")
        #         try:
        #             self.mysql_upper_single = True
        #         except Exception as e:
        #             print(e)
        #     else:
        #         self.mysql_upper_single = False
        #         self.show_warn.emit("mysql已安装")
        #         print("mysql已安装")
        #         self.write_log(data="mysql已安装")
        # else:
        #     self.write_log(data="mysql选项未勾选")
        # self.write_log(data="mysql检查通过")

        # # mysql升级前检查
        # if self.checkBox_mysql_single == 1:
        #     # C盘中不存在Mysql文件夹,或该文件夹下无内容,隐藏按钮
        #     if os.path.exists("C:\MySQL")==False or len(os.listdir("C:\MySQL")) <= 0:
        #         self.write_log(data="mysql符合升级条件")
        #         try:
        #             self.mysql_upper_single = True
        #         except Exception as e:
        #             print(e)
        #     else:
        #         self.mysql_upper_single = False
        #         self.show_warn.emit("mysql已安装")
        #         print("mysql已安装")
        #         self.write_log(data="mysql已安装")
        # else:
        #     self.write_log(data="mysql选项未勾选")
        # self.write_log(data="mysql检查通过")
        #
        # # redis升级前检查
        # if self.checkBox_redis_single == 1:
        #     # C盘中不存在Redis-x64-5.0.14文件夹,或该文件夹下无内容,隐藏按钮
        #     if os.path.exists("C:\Redis-x64-5.0.14") == False or len(os.listdir("C:\Redis-x64-5.0.14")) <= 0:
        #         self.write_log(data="redis符合升级条件")
        #         try:
        #             self.redis_upper_single = True
        #         except Exception as e:
        #             print(e)
        #     else:
        #         self.redis_upper_single = False
        #         self.show_warn.emit("redis已安装")
        #         print("redis已安装")
        #         self.write_log(data="redis已安装")
        # else:
        #     self.write_log(data="redis选项未勾选")
        # self.write_log(data="redis检查通过")

        # server升级前版本检查

        # mysql数据库数据表检查


        if self.checkBox_server_single ==1:
            # 获取当前server版本
            tip = "Server"
            version_usb ,version_com = self.get_version(tip)
            print("version_usb ,version_com==server",version_usb ,version_com)
            # 检查是否符合version升级条件
            version_condition = self.match_version(tip,version_usb,version_com)
            if version_condition == True:
                # 符合升级条件
                self.server_upper_single = True
            else:
                # 不符合升级条件
                self.server_upper_single = False

        # major升级前版本检查
        if self.checkBox_major_single == 1:
            # 获取当前major版本
            tip = "Major"
            version_usb, version_com = self.get_version(tip)
            print("version_usb ,version_com==major", version_usb, version_com)
            # 检查是否符合version升级条件
            version_condition = self.match_version(tip, version_usb, version_com)
            if version_condition == True:
                # 符合升级条件
                self.major_upper_single = True
            else:
                # 不符合升级条件
                self.major_upper_single = False

        # client升级前版本检查
        if self.checkBox_client_single == 1:
            # 获取当前client版本
            tip = "Client"
            version_usb, version_com = self.get_version(tip)
            print("version_usb ,version_com==client", version_usb, version_com)
            # 检查是否符合version升级条件
            version_condition = self.match_version(tip, version_usb, version_com)
            if version_condition == True:
                # 符合升级条件
                self.client_upper_single = True
            else:
                # 不符合升级条件
                self.client_upper_single = False

        # judge升级前版本检查
        if self.checkBox_judge_single == 1:
            # 获取当前judge版本
            tip = "Judge"
            version_usb, version_com = self.get_version(tip)
            print("version_usb ,version_com==judge", version_usb, version_com)
            # 检查是否符合version升级条件
            version_condition = self.match_version(tip, version_usb, version_com)
            if version_condition == True:
                # 符合升级条件
                self.judge_upper_single = True
            else:
                # 不符合升级条件
                self.judge_upper_single = False
                
        # display升级前版本检查
        if self.checkBox_display_single == 1:
            # 获取当前display版本
            tip = "Display"
            version_usb, version_com = self.get_version(tip)
            print("version_usb ,version_com==display", version_usb, version_com)
            # 检查是否符合version升级条件
            version_condition = self.match_version(tip, version_usb, version_com)
            if version_condition == True:
                # 符合升级条件
                self.display_upper_single = True
            else:
                # 不符合升级条件
                self.display_upper_single = False

        # 汇总升级结果
        # if self.mysql_upper_single==True:
        #     self.change_status_to_dark.emit()
        #     mysql_create.test_database_exists()
        # else:
        #     name = "mysql"
        #     self.write_log(data="{}不符合升级条件".format(name))

        # if self.redis_upper_single==True:
        #     self.change_status_to_dark.emit()
        #     self.set_redis()
        # else:
        #     name = "redis"
        #     self.write_log(data="{}不符合升级条件".format(name))

        if self.server_upper_single==True:
            name = "server"
            self.up_software(name)
        else:
            name = "server"
            self.write_log(data="{}不符合升级条件".format(name))

        if self.major_upper_single==True:
            name = "major"
            self.up_software(name)
        else:
            name = "major"
            self.write_log(data="{}不符合升级条件".format(name))

        if self.client_upper_single==True:
            name = "client"
            self.up_software(name)
        else:
            name = "client"
            self.write_log(data="{}不符合升级条件".format(name))

        if self.judge_upper_single==True:
            name = "judge"
            self.up_software(name)
        else:
            name = "judge"
            self.write_log(data="{}不符合升级条件".format(name))

        if self.display_upper_single==True:
            name = "display"
            self.up_software(name)
        else:
            name = "display"
            self.write_log(data="{}不符合升级条件".format(name))


        # 全都不具备升级条件/没选,弹框提示
        # 没选中
        if self.checkBox_mysql_single ==0 and self.checkBox_redis_single==0 and self.checkBox_server_single==0 and self.checkBox_major_single==0 and self.checkBox_client_single==0 and self.checkBox_judge_single==0 and self.checkBox_display_single==0:
            self.show_warn.emit("未选中要升级的软件")
        else:
            # 选中
            if self.mysql_upper_single == False and self.redis_upper_single == False and self.server_upper_single == False and self.major_upper_single == False and self.client_upper_single == False and self.judge_upper_single == False and self.display_upper_single == False:
                self.show_warn.emit("选中的软件版本已经是最新版,无需升级")
            else:
                # 部分升级,全都升级,都不提示
                # 关闭升级界面窗口

                self.write_log(data="升级完成")
                time.sleep(10)
                self.close_win.emit()
                self.write_log(data="窗口已关闭")

    def up_software(self,name):
        '''
        第一次安装时的升级流程
            1.关闭原有软件
            2.删除原有待升级文件
            3.拷贝升级包
            4.拷贝新的版本信息
            5.创建快捷方式
            6.重启
        '''
        # 取消按钮显示
        self.change_status_to_dark.emit()
        # 关闭原有软件
        try:
            self.shot_down_process(name)
        except:
            self.write_log(data="不存在{}.exe进程".format(name))

        # 删除原有待升级文件(exe)
        try:
            wait_to_remove_path = "C:\setup\{}\{}.exe".format(name,name)
            if os.path.exists(wait_to_remove_path):
                os.remove(wait_to_remove_path)
            else:
                self.write_log(data="不存在{}文件".format(wait_to_remove_path))
        except Exception as e:
            print(e)

        # 拷贝升级包
        try:
            from_file, computer_path = USB_VERSION_PATH + R"setup\setup\{}\{}.exe".format(name,name), R"C:\setup\{}\{}.exe".format(name,name)
            shutil.copy(from_file, computer_path)
            # self.copy_new_files(from_file, computer_path)
        except Exception as e:
            print(e)

        # 拷贝新的版本信息
        try:
            self.replace_version(name)
        except Exception as e:
            print(e)

        # # 创建快捷方式
        # try:
        #     bin_path = R"C:\setup\{}\{}.exe".format(name, name)
        #     self.create_shortcut(bin_path, name=name, desc=name)
        # except Exception as e:
        #     print(e)

        # 重启
        try:
            self.restart(name)
            time.sleep(3)
        except Exception as e:
            print(e)

    def replace_version(self,name):
        print("替换新的version")
        USB_VERSION_FILE = USB_VERSION_PATH + R"setup\setup\version.json"
        with open(USB_VERSION_FILE,'r',encoding="utf-8")as fp:
            usb_version_json = json.loads(fp.read())
        with open(COMPUTER_VERSION_PATH,'r',encoding="utf-8")as fp:
            com_version_json = json.loads(fp.read())
        print("usb_version_json,com_version_json",usb_version_json,com_version_json)

        com_version_json[name.capitalize()] = usb_version_json[name.capitalize()]
        print("com_version_json=====",com_version_json)
        with open(COMPUTER_VERSION_PATH,'w',encoding="utf-8")as fp:
            fp.write(str(com_version_json).replace("'",'"'))

    def match_version(self,tip,version_usb,version_com):
        if float(re.sub("V", "", version_com[tip])) < float(re.sub("V", "", version_usb[tip])):
            return True
        else:
            return False

    def get_version(self,file_upper):
        '''
        1.打开电脑中的版本,打开U盘中的版本
            存在version文件
                不存在key,添加key-value
                存在key,替换成U盘中的key对应的value
            不存在version文件
                1.拷贝u盘中第一层setup下的version文件到电脑
                2.添加u盘中软件对应的key-value
        '''
        # U盘version
        USB_VERSION_FILE = USB_VERSION_PATH+R"setup\setup\version.json"
        USB_VERSION_FILE_EMPTY = USB_VERSION_PATH + R"setup\version.json"
        with open(USB_VERSION_FILE,'r',encoding="utf-8")as fp:
            usb_version_json = json.loads(fp.read())
        # 电脑里的version
        if os.path.exists(COMPUTER_VERSION_PATH):
            pass
        else:
            # 不存在version 证明初次
            # 拷贝u盘中第一层setup下的version文件到电脑
            if os.path.exists("C:\setup")==False:
                os.makedirs("C:\setup")
            else:
                pass
            shutil.copy(USB_VERSION_FILE_EMPTY, "C:\setup")
        with open(COMPUTER_VERSION_PATH, 'r', encoding="utf-8")as fp:
            com_version_json = json.loads(fp.read())
            print("com_version_json",com_version_json)
            # 空的刚拷贝的所以不存在else情况
            if file_upper not in com_version_json:
                com_version_json[file_upper] = "V0.0"
                with open(COMPUTER_VERSION_PATH, 'w', encoding="utf-8")as fp:
                    fp.write(str(com_version_json).replace("'",'"'))


        return usb_version_json,com_version_json

    def paintEvent(self, a0: QPaintEvent) -> None:
        palette = QPalette()
        pix = QPixmap(get_path.get_path() + R"gui\res\bg_upper.jpg")
        pix = pix.scaled(self.width(), self.height())
        palette.setBrush(QPalette.Background, QBrush(pix))
        self.setPalette(palette)

    def write_log(self, data):
        with open(get_path.get_path() + R"gui\res\log.txt", 'a', encoding="utf-8") as fp:
            fp.write(data + "\n")

    def warning(self,msg):
        QMessageBox.information(self,'warning',str(msg),QMessageBox.Yes|QMessageBox.No,QMessageBox.Yes)
    
def GUI_start():
    app = QApplication(sys.argv)
    ui = MonitorUpper()
    with open(get_path.get_path() + r'\gui\res\style.qss', 'r') as f:
        app.setStyleSheet(f.read())
    ui.show()
    sys.exit(app.exec())


Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐