python - 多进程
问题导读:ProcessProcess classLockSemaphoreEventQueuePool解决方案:Process#!/usr/bin/env python# coding=utf8import multiprocessingimport timedef sayHello(interval):for i in range(
·
问题导读:
- Process
- Process class
- Lock
- Semaphore
- Event
- Queue
- Pool
解决方案:
Process
#!/usr/bin/env python
# coding=utf8
import multiprocessing
import time
def sayHello(interval):
for i in range(5):
print 'The time is {0}'.format(time.ctime())
time.sleep(interval)
if __name__=='__main__':
p = multiprocessing.Process(target=sayHello, args=(3,))
p.start()
p1 = multiprocessing.Process(target=sayHello, args=(3,))
p1.start()
print 'pid:', p.pid, ' ', p1.pid
print 'name:' ,p.name, ' ', p1.name
print 'is_alive:', p.is_alive(), ' ', p1.is_alive()
Process Class
#!/usr/bin/env python
# coding=utf-8
import multiprocessing
import time
class SayHello(multiprocessing.Process):
def __init__(self, interval):
multiprocessing.Process.__init__(self)
self.interval = interval
def run(self):
for i in range(4):
print 'time is {0} {1}'.format(time.ctime(),self.pid)
time.sleep(self.interval)
if __name__=='__main__':
for i in range(5):
p = SayHello(1)
p.start()
Lock
#!/usr/bin/env python
# coding=utf-8
import multiprocessing
import time
def sayHello(lock, i):
# 获取锁
lock.acquire()
print 'start:{0}'.format(time.ctime())
try:
with open('./data.txt','a') as f:
f.write(time.ctime() + ' id:' + str(i) + '\n')
finally:
# 释放锁
lock.release()
time.sleep(1)
print 'end:{0}'.format(time.ctime())
if __name__=='__main__':
lock = multiprocessing.Lock()
p1 = multiprocessing.Process(target=sayHello, args = (lock,3))
p2 = multiprocessing.Process(target=sayHello, args = (lock,4))
# 主进程结束,子线程结束
p1.daemon = True
p2.daemon = True
p1.start()
p2.start()
# 主进程会在结束之前检查是否有子线程未完成
p1.join()
p2.join()
print 'end!'
Semaphore
#!/usr/bin/env python
# coding=utf-8
import multiprocessing
import time
def sayHello(s, i):
# 获取
s.acquire()
print multiprocessing.current_process().name + ' acquire'
time.sleep(i)
print multiprocessing.current_process().name + ' release'
# 释放
s.release()
if __name__=='__main__':
# 控制共享资源的访问数量
s = multiprocessing.Semaphore(2)
for i in range(4):
p = multiprocessing.Process(target=sayHello, args=(s, i))
p.start()
Event
#!/usr/bin/env python
# coding=utf-8
import multiprocessing
import time
def wait_for1(e):
print 'wait_for_1: starting'
# 等待1s
e.wait(1)
print 'wait_for_1:{0}'.format(str(e.is_set()))
def wait_for2(e):
print 'wait_for_2: starting'
# 等待5s
e.wait(5)
print 'wait_for_2:{0}'.format(str(e.is_set()))
if __name__=='__main__':
# 在进程之间传递状态
e = multiprocessing.Event()
e1 = multiprocessing.Process(name = 'p1', target = wait_for1, args = (e,))
e2 = multiprocessing.Process(name = 'p2', target = wait_for2, args=(e,))
e1.start()
e2.start()
# 等待3s 之后设置状态
time.sleep(3)
e.set()
print 'Event Set Ok!'
Queue
- Queue.qsize() 返回队列的大小
- Queue.empty() 如果队列为空,返回True,反之False
- Queue.full() 如果队列满了,返回True,反之False
- Queue.get([block[, timeout]]) 获取队列,timeout等待时间
- Queue.get_nowait() 相当Queue.get(False)
- 非阻塞 Queue.put(item) 写入队列,timeout等待时间
- Queue.put_nowait(item) 相当Queue.put(item, False)
#!/usr/bin/env python
# coding=utf-8
import multiprocessing
import time
def writer_proc(q):
try:
for i in range(5):
time.sleep(1)
q.put(i,timeout = 2)
print 'put:',i
except:
pass
def reader_proc(q):
try:
for i in range(5):
j = q.get(timeout = 2)
print 'get:',j
except:
pass
if __name__=='__main__':
q = multiprocessing.Queue()
writer = multiprocessing.Process(target = writer_proc, args=(q,))
writer.start()
reader = multiprocessing.Process(target = reader_proc, args=(q,))
reader.start()
Pool
# coding: utf-8
import multiprocessing
import os, time, random
def Lee():
print "\nRun task Lee-%s" % (os.getpid()) # os.getpid()获取当前的进程的ID
start = time.time()
time.sleep(random.random() * 10) # random.random()随机生成0-1之间的小数
end = time.time()
print 'Task Lee, runs %0.2f seconds.' % (end - start)
def Marlon():
print "\nRun task Marlon-%s" % (os.getpid())
start = time.time()
time.sleep(random.random() * 40)
end = time.time()
print 'Task Marlon runs %0.2f seconds.' % (end - start)
def Allen():
print "\nRun task Allen-%s" % (os.getpid())
start = time.time()
time.sleep(random.random() * 30)
end = time.time()
print 'Task Allen runs %0.2f seconds.' % (end - start)
def Frank():
print "\nRun task Frank-%s" % (os.getpid())
start = time.time()
time.sleep(random.random() * 20)
end = time.time()
print 'Task Frank runs %0.2f seconds.' % (end - start)
if __name__ == '__main__':
function_list = [Lee, Marlon, Allen, Frank]
print "parent process %s" % (os.getpid())
pool = multiprocessing.Pool(4)
for func in function_list:
# apply 线程阻塞,执行线程代码的时候,会阻塞主进程
pool.apply_async(func) # Pool执行函数,apply执行函数,当有一个进程执行完毕后,会添加一个新的进程到pool中
print 'Waiting for all subprocesses done...'
pool.close()
pool.join() # 调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束
print 'All subprocesses done.'
更多推荐
已为社区贡献8条内容
所有评论(0)