Python3 多线程多表入库优化
>>>>场景:多线程多表入库优化--每个线程对应一张表#!/usr/bin/env python# -*- coding: utf-8 -*-# Author: jia666# Time: 2021/3/31 17:58import timeimport queueimport randomfrom threading import Thread'''任务场景多线程多表入库
·
>>>>场景:多线程多表入库优化--每个线程对应一张表
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author : jia666
# Time : 2021/3/31 17:58
import time
import queue
import random
from threading import Thread
'''任务场景
多线程多表入库:
原来的入库队列只有一个主队列,队列里的数据,多为连续同属于属于一个表的数据,
在执行多线程时,由于多个线程都执行同属于一个表的数据,且第一个获取改表的线程会进行锁表,其他线程只能等待
实际效果与单线程相比,效果提升并不明显
举个有味道的例子,你就懂了
---------------------------------------*------------------
厕所有4个坑位,每个坑位只允许一个部门的人上厕所,有一群人排队上厕所,他们都是一个部门的,因为喜欢结伴一起去,
厕所每次进四个人,然而,四个人都是同一个部门的,按规定,只能使用指定的一个坑位,结果就是四个人用一个坑位,等待前一个人用完才能用
效率低下,其他三个坑位没有用,浪费资源
----------------------------------------------------------
优化:
将一个主队列分为多个线程子队列
每个线程子队列处理特定的一个表,这样避免上面出现的锁表等待问题
当线程子队列完成其任务,再次从主队列获取一个表的子队列进行入库
举个有味道的例子,你就懂了
---------------------------------------*------------------
厕所有4个坑位,每个坑位只允许一个部门的人上厕所,有一群人排队上厕所,他们都是一个部门的,因为喜欢结伴一起去,
按部门分为四个队伍,每次厕所进4个人,每个队伍出来一个人,这样就效率提高了,没有浪费坑位
----------------------------------------------------------
'''
class Mysql(Thread):
'模拟数据入库'
def __init__(self,task_queue):
super().__init__()
self.task_queue=task_queue #任务队列
def run(self):
while not self.task_queue.empty():
self.task_queue.get() #获取一个数据
a = random.randint(1, 3) # 随机时间
time.sleep(a) #模拟入库需要的时间
print('----已处理一个数据------')
class auxiliary_function(object):
'辅助对象'
def __init__(self):
pass
def create_queue(self):
'模拟主队列数据'
#global main_que
for i in range(7):
for j in range(5):
main_que.put(('table'+str(i),j)) #模拟主队列生成数据,(表名,数据)
def names_var(self):
'生成动态变量'
for i in range(Mysql_thd): #生成动态变量队列
names['queue_' + str(i)] = queue.Queue()
register_dict.update({'queue_' + str(i):-1})
def main_split_queue(self):
'主队列数据分配到子队列'
while not main_que.empty():
table,df=main_que.get()
tmp_queue=False
for key,value in register_dict.items():
if value==-1: #说明此为空队列
tmp_queue=names.get(key)# 临时队列指针-指向动态新增变量队列
register_dict.update({key:table}) #更新变量队列专属对应的表名
tmp_queue.put((table,df))
break
elif value==table: #改表名称已有专属对应队列
tmp_queue=names.get(key)# 临时队列指针-指向动态新增变量队列
tmp_queue.put((table, df))
break
else: #其他不处理
pass
if not tmp_queue: #已经没有专属队列可以分配,退出循环
main_que.put((table,df))
break
def count_size(self):
'统计每个子队列的含有的数据量'
for key,value in register_dict.items():
tmp_queue=names.get(key)# 临时队列指针-指向动态新增变量队列
print(key,tmp_queue.qsize())
def thd_to_mysql(self):
'初始化多线程入库'
for que_id,table_name in register_dict.items():
tmp_queue = names.get(que_id) # 临时队列指针-指向动态新增变量队列
sa=Mysql(task_queue=tmp_queue)
sa.start()
thd_dict.update({que_id:sa})
def check_thd_alive(self):
'检测线程是否存活-触发二次分配'
spilt_sign=False #触发子队列分配标识符
for que_id,thd_id in thd_dict.items():
if not thd_id.is_alive():#如果入库线程不存活-
register_dict.update({que_id:-1})
spilt_sign=True
if spilt_sign:
self.main_split_queue() #触发下一次的子队列分配
def data_to_sql(self):
'持续多线程入库'
for que_id,table_name in register_dict.items():
tmp_queue = names.get(que_id) # 临时队列指针-指向动态新增变量队列
thd_id=thd_dict.get(que_id)
if not thd_id.is_alive(): # 如果入库线程不存活-
sa=Mysql(task_queue=tmp_queue)
sa.start()
thd_dict.update({que_id:sa})
def while_mysql(self):
'循环入库-直至入库完成'
self.thd_to_mysql() #初始第一次启动
while not main_que.empty():
self.check_thd_alive()
self.data_to_sql()
def run(self):
'运行主函数'
self.names_var() # 创建子队列变量
self.create_queue() # 创建主队列数据
self.main_split_queue() # 主队分割多个队列
self.while_mysql() # 循环入库,直至完成
if __name__ == '__main__':
main_que=queue.Queue() #模拟主任务队列
names = locals() # 局部变量
register_dict = {} # 登记字典
Mysql_thd=5 # 入库线程数
thd_dict={} # 多线程字典
example=auxiliary_function() # 实例化对象
example.run() # 主函数
更多推荐
已为社区贡献16条内容
所有评论(0)