U盘升级软件版本
#!/usr/bin/env python# encoding: utf-8'''@author: JHC@license: None@contact: JHC000abc@gmail.com@file: host.py@time: 2022/5/9 11:08@desc:检测U盘插入脚本'''import datetimeimport jsonimport osimport timefrom g
·
#!/usr/bin/env python
# encoding: utf-8
'''
@author: JHC
@license: None
@contact: JHC000abc@gmail.com
@file: host.py
@time: 2022/5/9 11:08
@desc:检测U盘插入脚本
'''
import datetime
import json
import os
import time
from gui.control import monitor_upper_window
from psutil import disk_partitions
import get_path
import multiprocessing
TIME_WAIT = 3
COMPUTER_VERSION_PATH = R"C:\setup\version.json"
if os.path.exists(get_path.get_path()+R"gui\res\log.txt"):
print("删除旧的log")
print(get_path.get_path()+R"gui\res\log.txt")
os.remove(get_path.get_path()+R"gui\res\log.txt")
else:
print("不存在旧的log")
class UsbCheck:
def __init__(self):
self.flag = 0
self.mode = 0
# 版本状态
self.mysql_flag = 0
self.redis_flag = 0
self.server_flag = 0
self.client_flag = 0
self.judge_flag = 0
self.display_flag = 0
self.major_flag = 0
def run(self):
'''
运行
:return:
'''
while True:
print("循环检测U盘")
self.write_log(data="循环检测U盘")
# 构建当前移动设备列表
device_list=[i.device for i in disk_partitions() if i.opts=='rw,removable']
removable_len = len(device_list)
# 移动设备存在
if removable_len > 0:
# print("检测到U盘")
self.write_log(data="检测到U盘")
# 控制仅执行一次
if self.flag==0:
self.flag = 1
# print('发现usb驱动:', device_list)
self.write_log(data="发现usb驱动{}".format(device_list))
for i in device_list:
if os.path.exists(i.replace("\\","") +R"\\setup\setup\version.json"):
uDiskPath = i.replace("\\","")
else:
print("不存在version文件,非专用升级U盘")
self.write_log(data="不存在version文件,非专用升级U盘")
try:
with open(get_path.get_path() + R"gui\res\set.json", 'w', encoding="utf-8") as fp:
fp.write(json.dumps({
"uDiskPath": uDiskPath
}))
print("uDiskPath={}".format(uDiskPath))
self.write_log(data="uDiskPath={}".format(uDiskPath))
# 弹出窗口
# self.show_win()
p = multiprocessing.Process(target=self.show_win)
p.start()
except Exception as e:
print(e)
else:
# print("没有新盘插入")
self.write_log(data="没有新盘插入")
pass
# 移动设备被拔出
elif self.flag == 1 and removable_len <= 0:
# print("移动设备被拔出")
self.write_log(data="移动设备被拔出")
self.flag = 0
else:
# print("未插入移动设备")
self.write_log(data="未插入移动设备")
# 休眠三秒
time.sleep(3)
def show_win(self):
monitor_upper_window.GUI_start()
def write_log(self,data):
with open(get_path.get_path()+R"gui\res\log.txt",'a',encoding="utf-8")as fp:
fp.write(data+"\n")
if __name__ == '__main__':
# 进程打包必须加,防止进程多开
multiprocessing.freeze_support()
uc = UsbCheck()
uc.run()
```python
#!/usr/bin/env python
# encoding: utf-8
'''
@author: JHC
@license: None
@contact: JHC000abc@gmail.com
@file: monitor_upper_window.py
@time: 2022/5/13 10:39
@desc:只替换exe版本,检测到U盘后比对版本,符合升级条件升级
'''
import json
import os
import sys
import re
import shutil
import subprocess
import threading
import time
from gui.control import mysql_create
import pythoncom
import winshell
from PyQt5.QtWidgets import QWidget, QApplication, QMessageBox
from PyQt5.QtGui import QMovie, QPalette, QPixmap, QBrush, QPaintEvent
from PyQt5.QtCore import Qt, pyqtSignal
from gui.ui import monitor_upper_8
import get_path
COMPUTER_VERSION_PATH = R"C:\setup\version.json"
with open(get_path.get_path()+R"gui\res\set.json","r",encoding="utf-8")as fp:
USB_VERSION_PATH = json.loads(fp.read())["uDiskPath"]+"\\"
class MonitorUpper(QWidget):
close_win = pyqtSignal()
change_status_to_dark = pyqtSignal()
change_status_to_light = pyqtSignal()
show_warn = pyqtSignal(str)
def __init__(self):
super().__init__()
self.ui = monitor_upper_8.Ui_Form()
self.ui.setupUi(self)
self.setWindowFlags(Qt.FramelessWindowHint)
self.showFullScreen()
self.checkBox_server_single = 0
self.checkBox_client_single = 0
self.checkBox_major_single = 0
self.checkBox_judge_single = 0
self.checkBox_display_single = 0
# self.checkBox_mysql_single = 0
# self.checkBox_redis_single = 0
self.version_single = 0
self.sin = 0
self.server_upper_single = False
self.major_upper_single = False
self.client_upper_single = False
self.judge_upper_single = False
self.display_upper_single = False
# self.redis_upper_single = False
# self.mysql_upper_single = False
self.ui.checkBox_redis.hide()
self.ui.pushButton_start.hide()
palette = QPalette()
pix = QPixmap(get_path.get_path() + R"gui\res\bg_upper.jpg")
pix = pix.scaled(self.width(), self.height())
palette.setBrush(QPalette.Background, QBrush(pix))
gif = QMovie(get_path.get_path() + R"gui\res\gif_upper.gif")
self.ui.label_gif.setMovie(gif)
gif.start()
self.ui.checkBox_server.clicked.connect(self.checkBox_server)
self.ui.checkBox_client.clicked.connect(self.checkBox_client)
self.ui.checkBox_major.clicked.connect(self.checkBox_major)
self.ui.checkBox_judge.clicked.connect(self.checkBox_judge)
self.ui.checkBox_display.clicked.connect(self.checkBox_display)
self.ui.pushButton_start.clicked.connect(self.pushButton_start)
# self.ui.checkBox_mysql.clicked.connect(self.checkBox_mysql)
# self.ui.checkBox_redis.clicked.connect(self.checkBox_redis)
self.close_win.connect(self.close_window)
self.change_status_to_dark.connect(self.change_btn_status_to_dark)
self.show_warn.connect(self.warning)
def change_btn_status_to_dark(self):
# 隐藏按钮
self.ui.checkBox_server.hide()
self.ui.checkBox_client.hide()
self.ui.checkBox_judge.hide()
self.ui.checkBox_major.hide()
self.ui.checkBox_display.hide()
self.ui.checkBox_mysql.hide()
# self.ui.checkBox_redis.hide()
# self.ui.pushButton_start.hide()
# 显示提示文字
self.ui.label_2.setText("软件升级中……")
self.ui.label_2.setAlignment(Qt.AlignRight)
self.ui.label_2.setAlignment(Qt.AlignTop)
self.ui.label_2.setStyleSheet(
'font: 36pt "方正小标宋简体";color: rgb(85, 255, 255);'
)
def close_window(self):
try:
self.close()
print("界面正常关闭")
self.write_log(data="界面正常关闭")
except Exception as e:
print("界面关闭异常")
self.write_log(data="界面关闭异常,原因:{}".format(e))
def checkBox_server(self):
if self.checkBox_server_single == 0:
self.checkBox_server_single = 1
else:
self.checkBox_server_single = 0
def checkBox_client(self):
if self.checkBox_client_single == 0:
self.checkBox_client_single = 1
else:
self.checkBox_client_single = 0
def checkBox_major(self):
if self.checkBox_major_single == 0:
self.checkBox_major_single = 1
else:
self.checkBox_major_single = 0
def checkBox_judge(self):
if self.checkBox_judge_single == 0:
self.checkBox_judge_single = 1
else:
self.checkBox_judge_single = 0
def checkBox_display(self):
if self.checkBox_display_single == 0:
self.checkBox_display_single = 1
else:
self.checkBox_display_single = 0
# def checkBox_mysql(self):
# if self.checkBox_mysql_single == 0:
# self.checkBox_mysql_single =1
# else:
# self.checkBox_mysql_single = 0
#
# def checkBox_redis(self):
# if self.checkBox_redis_single == 0:
# self.checkBox_redis_single = 1
# else:
# self.checkBox_redis_single = 0
# print(self.checkBox_display_single)
# def set_mysql(self):
# cmd = USB_VERSION_PATH + R"setup\update_mysql.bat"
# self.write_log(data=cmd)
# subprocess.Popen(cmd)
# def set_redis(self):
# cmd = USB_VERSION_PATH + R"setup\update_redis.bat"
# self.write_log(data=cmd)
# os.popen(cmd)
# def create_shortcut(self, bin_path, name, desc):
# pythoncom.CoInitialize()
# print("bin_path,name,desc = ",bin_path,name,desc)
# try:
# shortcut = os.path.join(winshell.desktop(), name + ".lnk")
# winshell.CreateShortcut(
# Path=shortcut,
# Target=bin_path,
# Icon=(bin_path, 0),
# Description=desc
# )
# return True
# except ImportError as e:
# print(e)
# self.write_log(data="Well, do nothing as 'winshell' lib may not available on current os")
# self.write_log(data="error detail %s" % str(e))
# return False
def copy_new_files(self, from_file, computer_path):
if not os.path.exists(computer_path):
os.makedirs(computer_path)
files = os.listdir(from_file)
for f in files:
if os.path.isdir(from_file + '/' + f):
self.copy_new_files(from_file + '/' + f, computer_path + '/' + f)
else:
shutil.copy(from_file + '/' + f, computer_path + '/' + f)
def shot_down_process(self,process_name):
cmd = 'taskkill /F /IM ' + process_name
os.system(cmd)
def restart(self,name):
cmd = R"C:\setup\{}\{}.exe".format(name, name)
subprocess.Popen(cmd)
def pushButton_start(self):
try:
t1 = threading.Thread(target=self.run_start)
t1.start()
except Exception as e:
print(e)
def run_start(self):
'''
点击开始后
1.检查被勾选的待升级项
2.检查被勾选的待升级项在U盘中是否存在更高版本的升级包
判断是不是存在,要判断所有勾选的是否都符合升级条件,
完全符合 直接隐藏按钮 进行升级
部分符合 忽略不符合的,隐藏按钮 升级符合部分,升级完成后弹框提示不符合升级条件的
全不符合 弹框提示
'''
# if self.checkBox_mysql_single == 1:
# # C盘中不存在Mysql文件夹,或该文件夹下无内容,隐藏按钮
# if os.path.exists("C:\MySQL")==False or len(os.listdir("C:\MySQL")) <= 0:
# self.write_log(data="mysql符合升级条件")
# try:
# self.mysql_upper_single = True
# except Exception as e:
# print(e)
# else:
# self.mysql_upper_single = False
# self.show_warn.emit("mysql已安装")
# print("mysql已安装")
# self.write_log(data="mysql已安装")
# else:
# self.write_log(data="mysql选项未勾选")
# self.write_log(data="mysql检查通过")
# # mysql升级前检查
# if self.checkBox_mysql_single == 1:
# # C盘中不存在Mysql文件夹,或该文件夹下无内容,隐藏按钮
# if os.path.exists("C:\MySQL")==False or len(os.listdir("C:\MySQL")) <= 0:
# self.write_log(data="mysql符合升级条件")
# try:
# self.mysql_upper_single = True
# except Exception as e:
# print(e)
# else:
# self.mysql_upper_single = False
# self.show_warn.emit("mysql已安装")
# print("mysql已安装")
# self.write_log(data="mysql已安装")
# else:
# self.write_log(data="mysql选项未勾选")
# self.write_log(data="mysql检查通过")
#
# # redis升级前检查
# if self.checkBox_redis_single == 1:
# # C盘中不存在Redis-x64-5.0.14文件夹,或该文件夹下无内容,隐藏按钮
# if os.path.exists("C:\Redis-x64-5.0.14") == False or len(os.listdir("C:\Redis-x64-5.0.14")) <= 0:
# self.write_log(data="redis符合升级条件")
# try:
# self.redis_upper_single = True
# except Exception as e:
# print(e)
# else:
# self.redis_upper_single = False
# self.show_warn.emit("redis已安装")
# print("redis已安装")
# self.write_log(data="redis已安装")
# else:
# self.write_log(data="redis选项未勾选")
# self.write_log(data="redis检查通过")
# server升级前版本检查
# mysql数据库数据表检查
if self.checkBox_server_single ==1:
# 获取当前server版本
tip = "Server"
version_usb ,version_com = self.get_version(tip)
print("version_usb ,version_com==server",version_usb ,version_com)
# 检查是否符合version升级条件
version_condition = self.match_version(tip,version_usb,version_com)
if version_condition == True:
# 符合升级条件
self.server_upper_single = True
else:
# 不符合升级条件
self.server_upper_single = False
# major升级前版本检查
if self.checkBox_major_single == 1:
# 获取当前major版本
tip = "Major"
version_usb, version_com = self.get_version(tip)
print("version_usb ,version_com==major", version_usb, version_com)
# 检查是否符合version升级条件
version_condition = self.match_version(tip, version_usb, version_com)
if version_condition == True:
# 符合升级条件
self.major_upper_single = True
else:
# 不符合升级条件
self.major_upper_single = False
# client升级前版本检查
if self.checkBox_client_single == 1:
# 获取当前client版本
tip = "Client"
version_usb, version_com = self.get_version(tip)
print("version_usb ,version_com==client", version_usb, version_com)
# 检查是否符合version升级条件
version_condition = self.match_version(tip, version_usb, version_com)
if version_condition == True:
# 符合升级条件
self.client_upper_single = True
else:
# 不符合升级条件
self.client_upper_single = False
# judge升级前版本检查
if self.checkBox_judge_single == 1:
# 获取当前judge版本
tip = "Judge"
version_usb, version_com = self.get_version(tip)
print("version_usb ,version_com==judge", version_usb, version_com)
# 检查是否符合version升级条件
version_condition = self.match_version(tip, version_usb, version_com)
if version_condition == True:
# 符合升级条件
self.judge_upper_single = True
else:
# 不符合升级条件
self.judge_upper_single = False
# display升级前版本检查
if self.checkBox_display_single == 1:
# 获取当前display版本
tip = "Display"
version_usb, version_com = self.get_version(tip)
print("version_usb ,version_com==display", version_usb, version_com)
# 检查是否符合version升级条件
version_condition = self.match_version(tip, version_usb, version_com)
if version_condition == True:
# 符合升级条件
self.display_upper_single = True
else:
# 不符合升级条件
self.display_upper_single = False
# 汇总升级结果
# if self.mysql_upper_single==True:
# self.change_status_to_dark.emit()
# mysql_create.test_database_exists()
# else:
# name = "mysql"
# self.write_log(data="{}不符合升级条件".format(name))
# if self.redis_upper_single==True:
# self.change_status_to_dark.emit()
# self.set_redis()
# else:
# name = "redis"
# self.write_log(data="{}不符合升级条件".format(name))
if self.server_upper_single==True:
name = "server"
self.up_software(name)
else:
name = "server"
self.write_log(data="{}不符合升级条件".format(name))
if self.major_upper_single==True:
name = "major"
self.up_software(name)
else:
name = "major"
self.write_log(data="{}不符合升级条件".format(name))
if self.client_upper_single==True:
name = "client"
self.up_software(name)
else:
name = "client"
self.write_log(data="{}不符合升级条件".format(name))
if self.judge_upper_single==True:
name = "judge"
self.up_software(name)
else:
name = "judge"
self.write_log(data="{}不符合升级条件".format(name))
if self.display_upper_single==True:
name = "display"
self.up_software(name)
else:
name = "display"
self.write_log(data="{}不符合升级条件".format(name))
# 全都不具备升级条件/没选,弹框提示
# 没选中
if self.checkBox_mysql_single ==0 and self.checkBox_redis_single==0 and self.checkBox_server_single==0 and self.checkBox_major_single==0 and self.checkBox_client_single==0 and self.checkBox_judge_single==0 and self.checkBox_display_single==0:
self.show_warn.emit("未选中要升级的软件")
else:
# 选中
if self.mysql_upper_single == False and self.redis_upper_single == False and self.server_upper_single == False and self.major_upper_single == False and self.client_upper_single == False and self.judge_upper_single == False and self.display_upper_single == False:
self.show_warn.emit("选中的软件版本已经是最新版,无需升级")
else:
# 部分升级,全都升级,都不提示
# 关闭升级界面窗口
self.write_log(data="升级完成")
time.sleep(10)
self.close_win.emit()
self.write_log(data="窗口已关闭")
def up_software(self,name):
'''
第一次安装时的升级流程
1.关闭原有软件
2.删除原有待升级文件
3.拷贝升级包
4.拷贝新的版本信息
5.创建快捷方式
6.重启
'''
# 取消按钮显示
self.change_status_to_dark.emit()
# 关闭原有软件
try:
self.shot_down_process(name)
except:
self.write_log(data="不存在{}.exe进程".format(name))
# 删除原有待升级文件(exe)
try:
wait_to_remove_path = "C:\setup\{}\{}.exe".format(name,name)
if os.path.exists(wait_to_remove_path):
os.remove(wait_to_remove_path)
else:
self.write_log(data="不存在{}文件".format(wait_to_remove_path))
except Exception as e:
print(e)
# 拷贝升级包
try:
from_file, computer_path = USB_VERSION_PATH + R"setup\setup\{}\{}.exe".format(name,name), R"C:\setup\{}\{}.exe".format(name,name)
shutil.copy(from_file, computer_path)
# self.copy_new_files(from_file, computer_path)
except Exception as e:
print(e)
# 拷贝新的版本信息
try:
self.replace_version(name)
except Exception as e:
print(e)
# # 创建快捷方式
# try:
# bin_path = R"C:\setup\{}\{}.exe".format(name, name)
# self.create_shortcut(bin_path, name=name, desc=name)
# except Exception as e:
# print(e)
# 重启
try:
self.restart(name)
time.sleep(3)
except Exception as e:
print(e)
def replace_version(self,name):
print("替换新的version")
USB_VERSION_FILE = USB_VERSION_PATH + R"setup\setup\version.json"
with open(USB_VERSION_FILE,'r',encoding="utf-8")as fp:
usb_version_json = json.loads(fp.read())
with open(COMPUTER_VERSION_PATH,'r',encoding="utf-8")as fp:
com_version_json = json.loads(fp.read())
print("usb_version_json,com_version_json",usb_version_json,com_version_json)
com_version_json[name.capitalize()] = usb_version_json[name.capitalize()]
print("com_version_json=====",com_version_json)
with open(COMPUTER_VERSION_PATH,'w',encoding="utf-8")as fp:
fp.write(str(com_version_json).replace("'",'"'))
def match_version(self,tip,version_usb,version_com):
if float(re.sub("V", "", version_com[tip])) < float(re.sub("V", "", version_usb[tip])):
return True
else:
return False
def get_version(self,file_upper):
'''
1.打开电脑中的版本,打开U盘中的版本
存在version文件
不存在key,添加key-value
存在key,替换成U盘中的key对应的value
不存在version文件
1.拷贝u盘中第一层setup下的version文件到电脑
2.添加u盘中软件对应的key-value
'''
# U盘version
USB_VERSION_FILE = USB_VERSION_PATH+R"setup\setup\version.json"
USB_VERSION_FILE_EMPTY = USB_VERSION_PATH + R"setup\version.json"
with open(USB_VERSION_FILE,'r',encoding="utf-8")as fp:
usb_version_json = json.loads(fp.read())
# 电脑里的version
if os.path.exists(COMPUTER_VERSION_PATH):
pass
else:
# 不存在version 证明初次
# 拷贝u盘中第一层setup下的version文件到电脑
if os.path.exists("C:\setup")==False:
os.makedirs("C:\setup")
else:
pass
shutil.copy(USB_VERSION_FILE_EMPTY, "C:\setup")
with open(COMPUTER_VERSION_PATH, 'r', encoding="utf-8")as fp:
com_version_json = json.loads(fp.read())
print("com_version_json",com_version_json)
# 空的刚拷贝的所以不存在else情况
if file_upper not in com_version_json:
com_version_json[file_upper] = "V0.0"
with open(COMPUTER_VERSION_PATH, 'w', encoding="utf-8")as fp:
fp.write(str(com_version_json).replace("'",'"'))
return usb_version_json,com_version_json
def paintEvent(self, a0: QPaintEvent) -> None:
palette = QPalette()
pix = QPixmap(get_path.get_path() + R"gui\res\bg_upper.jpg")
pix = pix.scaled(self.width(), self.height())
palette.setBrush(QPalette.Background, QBrush(pix))
self.setPalette(palette)
def write_log(self, data):
with open(get_path.get_path() + R"gui\res\log.txt", 'a', encoding="utf-8") as fp:
fp.write(data + "\n")
def warning(self,msg):
QMessageBox.information(self,'warning',str(msg),QMessageBox.Yes|QMessageBox.No,QMessageBox.Yes)
def GUI_start():
app = QApplication(sys.argv)
ui = MonitorUpper()
with open(get_path.get_path() + r'\gui\res\style.qss', 'r') as f:
app.setStyleSheet(f.read())
ui.show()
sys.exit(app.exec())
更多推荐
已为社区贡献7条内容
所有评论(0)