>>>>场景:多线程多表入库优化--每个线程对应一张表
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author  : jia666
# Time    : 2021/3/31 17:58

import time
import queue
import random
from threading import Thread
'''任务场景
多线程多表入库:
原来的入库队列只有一个主队列,队列里的数据,多为连续同属于属于一个表的数据,
在执行多线程时,由于多个线程都执行同属于一个表的数据,且第一个获取改表的线程会进行锁表,其他线程只能等待
实际效果与单线程相比,效果提升并不明显
举个有味道的例子,你就懂了
---------------------------------------*------------------
厕所有4个坑位,每个坑位只允许一个部门的人上厕所,有一群人排队上厕所,他们都是一个部门的,因为喜欢结伴一起去,
厕所每次进四个人,然而,四个人都是同一个部门的,按规定,只能使用指定的一个坑位,结果就是四个人用一个坑位,等待前一个人用完才能用
效率低下,其他三个坑位没有用,浪费资源
----------------------------------------------------------
优化:
将一个主队列分为多个线程子队列
每个线程子队列处理特定的一个表,这样避免上面出现的锁表等待问题
当线程子队列完成其任务,再次从主队列获取一个表的子队列进行入库

举个有味道的例子,你就懂了
---------------------------------------*------------------
厕所有4个坑位,每个坑位只允许一个部门的人上厕所,有一群人排队上厕所,他们都是一个部门的,因为喜欢结伴一起去,
按部门分为四个队伍,每次厕所进4个人,每个队伍出来一个人,这样就效率提高了,没有浪费坑位
----------------------------------------------------------
'''

class Mysql(Thread):
    '模拟数据入库'
    def __init__(self,task_queue):
        super().__init__()
        self.task_queue=task_queue #任务队列
    def run(self):
        while not self.task_queue.empty():
            self.task_queue.get() #获取一个数据
            a = random.randint(1, 3)  # 随机时间
            time.sleep(a) #模拟入库需要的时间
            print('----已处理一个数据------')


class auxiliary_function(object):
    '辅助对象'
    def __init__(self):
        pass
    def create_queue(self):
        '模拟主队列数据'
        #global main_que
        for i in range(7):
            for j in range(5):
                main_que.put(('table'+str(i),j)) #模拟主队列生成数据,(表名,数据)
    def names_var(self):
        '生成动态变量'
        for i in range(Mysql_thd): #生成动态变量队列
            names['queue_' + str(i)] = queue.Queue()
            register_dict.update({'queue_' + str(i):-1})
    def main_split_queue(self):
        '主队列数据分配到子队列'
        while not main_que.empty():
            table,df=main_que.get()
            tmp_queue=False
            for key,value in register_dict.items():
                if value==-1: #说明此为空队列
                    tmp_queue=names.get(key)# 临时队列指针-指向动态新增变量队列
                    register_dict.update({key:table}) #更新变量队列专属对应的表名
                    tmp_queue.put((table,df))
                    break
                elif value==table: #改表名称已有专属对应队列
                    tmp_queue=names.get(key)# 临时队列指针-指向动态新增变量队列
                    tmp_queue.put((table, df))
                    break
                else: #其他不处理
                    pass
            if not tmp_queue: #已经没有专属队列可以分配,退出循环
                main_que.put((table,df))
                break
    def count_size(self):
        '统计每个子队列的含有的数据量'
        for key,value in register_dict.items():
            tmp_queue=names.get(key)# 临时队列指针-指向动态新增变量队列
            print(key,tmp_queue.qsize())
    def thd_to_mysql(self):
        '初始化多线程入库'
        for que_id,table_name in register_dict.items():
            tmp_queue = names.get(que_id)  # 临时队列指针-指向动态新增变量队列
            sa=Mysql(task_queue=tmp_queue)
            sa.start()
            thd_dict.update({que_id:sa})

    def check_thd_alive(self):
        '检测线程是否存活-触发二次分配'
        spilt_sign=False    #触发子队列分配标识符
        for que_id,thd_id in thd_dict.items():
            if not thd_id.is_alive():#如果入库线程不存活-
                register_dict.update({que_id:-1})
                spilt_sign=True
        if spilt_sign:
            self.main_split_queue() #触发下一次的子队列分配

    def data_to_sql(self):
        '持续多线程入库'
        for que_id,table_name in register_dict.items():
            tmp_queue = names.get(que_id)  # 临时队列指针-指向动态新增变量队列
            thd_id=thd_dict.get(que_id)
            if not thd_id.is_alive():  # 如果入库线程不存活-
                sa=Mysql(task_queue=tmp_queue)
                sa.start()
                thd_dict.update({que_id:sa})

    def while_mysql(self):
        '循环入库-直至入库完成'
        self.thd_to_mysql() #初始第一次启动
        while not main_que.empty():
            self.check_thd_alive()
            self.data_to_sql()

    def run(self):
        '运行主函数'
        self.names_var()  # 创建子队列变量
        self.create_queue()  # 创建主队列数据
        self.main_split_queue()  # 主队分割多个队列
        self.while_mysql() # 循环入库,直至完成
if __name__ == '__main__':
    main_que=queue.Queue()  #模拟主任务队列
    names = locals() # 局部变量
    register_dict = {}  # 登记字典
    Mysql_thd=5 # 入库线程数
    thd_dict={} # 多线程字典

    example=auxiliary_function() # 实例化对象
    example.run()   # 主函数
Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐