Python单线程异步编程、多线程、多进程编程
whisper语音识别、说话人分离、性别年龄 三个方法并行,从而加快处理速度
目标:
我想要让whisper语音识别、说话人分离、性别年龄 三个方法并行,从而加快处理速度。
单线程异步并发编程
whisper_result, speaker_result, age_gender_result = await asyncio.gather(
self.fast_whisper_recognition(output_file,audio_index),
self.speaker_recognition(output_file,audio_index),
self.age_gender_recognition(output_file)
)
在这里面,没有加快速度,原因是asyncio 适用于IO密集型,但是我这个地方是CPU密集型,在asyncio中,只能用到一个CPU核心,所以速度不会提升。
多线程并行编程
GIL,是最流行的 Python 解释器 CPython 中的一个技术术语。它的意思是全局解释器锁,本质上是类似操作系统的 Mutex。每一个 Python 线程,在 CPython 解释器中执行时,都会先锁住自己的线程,阻止别的线程执行。当然,CPython 会做一些小把戏,轮流执行 Python 线程。这样一来,用户看到的就是“伪并行”——Python 线程在交错执行,来模拟真正并行的线程。
如果你想对 CPU 密集型任务加速,使用多线程是无效的,请使用多进程。这里所谓的 CPU 密集型任务,是指会消耗大量 CPU 资源的任务,比如求 1 到 100000000 的乘积,或者是把一段很长的文字编码后又解码等等。使用多线程之所以无效,原因正是我们前面刚讲过的,Python 多线程的本质是多个线程互相切换,但同一时刻仍然只允许一个线程运行。因此,你使用多线程,和使用一个主线程,本质上来说并没有什么差别;反而在很多情况下,因为线程切换带来额外损耗,还会降低程序的效率。
python的多线程其实是并发,而不是并行,对于CPU密集型的工作,不会有性能提升
多进程并行编程
方式一(成功):直接把三个功能给分开,whisper放在单独的一个环境中运行,声纹识别和年龄性别识别放在另外一个环境中
把听到的音频的路径放在redis里面,让声纹识别、性别年龄识别去使用
g_redis.lpush("diarization_wav_paths",output_file + ":" + str(audio_index))
g_redis.lpush("agesex_wav_paths", output_file)
# 让whisper去做翻译
text = fast_whisper_recognition(f, output_file, audio_index)
# 当whisper翻译完成之后,从redis里面取出声纹识别、性别年龄等信息
speaker = g_redis.lpop("diarization_result").decode('utf-8')
agesex = g_redis.lpop("agesex_result").decode('utf-8')
方式二(失败):三个功能放在一个环境中运行
executor = concurrent.futures.ProcessPoolExecutor(max_workers=3)
output_file = "/opt/audio/audios/reduce_noise/2024-01-09/0.wav"
whipser_future = executor.submit(fast_whisper_recognition, output_file, 1)
speaker_future = executor.submit(speaker_recognition, output_file, 1)
future_agesex = executor.submit(age_gender_recognition, output_file)
result_whisper = whipser_future.result()
result_speaker = speaker_future.result()
result_agesex = future_agesex.result()
result_dict = {
"age": result_agesex,
"sex": result_agesex,
"person": result_speaker,
"content": "result_whisper"
}
我明明是在子进程1中初始化whisper,结果子进程3中也直接初始化了一遍whisper.....
[WARNING][20240110144248.192843][ForkProcess-3][PID:202173][TID:139914811664192]: fast_whisper_model 为 None,load fast_whisper start
原因貌似是:要确保 fast_whisper_recognition
函数总是在同一个子进程(例如 ForkProcess-1)中运行,需要采用一种方法来“绑定”特定的任务到特定的进程。然而,标准的 ProcessPoolExecutor
并不支持直接的进程和任务绑定。它的设计是为了通用的并行任务处理,而不是特定进程的任务处理。每次提交任务时,ProcessPoolExecutor
都会选择一个可用的进程来执行该任务。
方式三(失败):同一个环境中运行
class WebRTCServer:
executor = concurrent.futures.ProcessPoolExecutor(max_workers=3)
whisper_task_queue = multiprocessing.Queue()
whisper_result_queue = multiprocessing.Queue()
speaker_task_queue = multiprocessing.Queue()
speaker_result_queue = multiprocessing.Queue()
agesex_task_queue = multiprocessing.Queue()
agesex_result_queue = multiprocessing.Queue()
def __init__(:
self.whisper_process = multiprocessing.Process(
target=whisper_worker, args=(self.whisper_task_queue, self.whisper_result_queue))
self.whisper_process.start()
self.speaker_process = multiprocessing.Process(
target=speaker_recognition_worker, args=(self.speaker_task_queue, self.speaker_result_queue))
self.speaker_process.start()
self.age_gender_process = multiprocessing.Process(
target=age_gender_recognition_worker, args=(
self.agesex_task_queue, self.agesex_result_queue))
self.age_gender_process.start()
def whisper_worker(task_queue, result_queue):
# subprocess.run(["taskset", "-cp", str(50), str(os.getpid())], check=True)
"""
专用于运行 fast_whisper_recognition 的工作进程函数。
它从任务队列接收任务并将结果放入结果队列。
"""
while True:
Logger().warn("进入到了whisper_worker 进程中")
output_file, audio_index = task_queue.get()
if output_file is None:
break # 使用 None 作为结束信号
result = fast_whisper_recognition(output_file, audio_index)
result_queue.put(result)
但是速度依然没有提升,解释不通了
whisper、和说话人识别 每个进程用的CPU十分高,达到了百分之1000,也就是用掉了十个以上的CPU。 不是说一个进程由于全局解释器锁的原因,只能使用一个CPU吗? 下面的解释回答了这个问题,whisper已经绕过了全局解释器锁,我们直接假设可以用多个核心就可以了。
这个解释了CPU可以不仅仅使用一个核心,但是我的并行之后,效果没有提升,这个还是有疑惑。
查看一下详细信息,发现确实是并行执行了,但是为什么并行执行之后,whisper总是比串行慢了0.1秒~0.2秒就暂时无法解释了
对比第一种成功解决的方式来看,只要whisper在的环境,就不能再添加其他的运算了,whisper所在的环境貌似已经饱和了,那怕真实情况是我们服务器上面的CPU确实是没有用完。
更多推荐
所有评论(0)