问题导读:

  1. Process
  2. Process class
  3. Lock
  4. Semaphore
  5. Event
  6. Queue
  7. Pool

解决方案:


Process


#!/usr/bin/env python
# coding=utf8
import multiprocessing
import time


def sayHello(interval):
    for i in range(5):
        print 'The time is {0}'.format(time.ctime())
        time.sleep(interval)

if __name__=='__main__':
    p = multiprocessing.Process(target=sayHello, args=(3,))
    p.start()

    p1 = multiprocessing.Process(target=sayHello, args=(3,))
    p1.start()

    print 'pid:', p.pid, ' ', p1.pid
    print 'name:' ,p.name, ' ', p1.name
    print 'is_alive:', p.is_alive(), ' ', p1.is_alive()

Process Class


#!/usr/bin/env python
# coding=utf-8
import multiprocessing
import time

class SayHello(multiprocessing.Process):
    def __init__(self, interval):
        multiprocessing.Process.__init__(self)
        self.interval = interval

    def run(self):
        for i in range(4):
            print 'time is {0} {1}'.format(time.ctime(),self.pid)
            time.sleep(self.interval)

if __name__=='__main__':
    for i in range(5):
        p = SayHello(1)
        p.start()

Lock


#!/usr/bin/env python
# coding=utf-8
import multiprocessing
import time
def sayHello(lock, i):
    # 获取锁
    lock.acquire()
    print 'start:{0}'.format(time.ctime())
    try:
        with open('./data.txt','a') as f:
            f.write(time.ctime() + ' id:' + str(i) + '\n')
    finally:
        # 释放锁
        lock.release()
    time.sleep(1)
    print 'end:{0}'.format(time.ctime())

if __name__=='__main__':
    lock = multiprocessing.Lock()
    p1 = multiprocessing.Process(target=sayHello, args = (lock,3))
    p2 = multiprocessing.Process(target=sayHello, args = (lock,4))
    # 主进程结束,子线程结束
    p1.daemon = True
    p2.daemon = True
    p1.start()
    p2.start()
    # 主进程会在结束之前检查是否有子线程未完成
    p1.join()
    p2.join()
    print 'end!'

Semaphore


#!/usr/bin/env python
# coding=utf-8

import multiprocessing
import time

def sayHello(s, i):
    # 获取
    s.acquire()
    print multiprocessing.current_process().name + ' acquire'
    time.sleep(i)
    print multiprocessing.current_process().name + ' release'
    # 释放
    s.release()
if __name__=='__main__':
    # 控制共享资源的访问数量
    s = multiprocessing.Semaphore(2)
    for i in range(4):
        p = multiprocessing.Process(target=sayHello, args=(s, i))
        p.start()

Event

#!/usr/bin/env python
# coding=utf-8
import multiprocessing
import time

def wait_for1(e):
    print 'wait_for_1: starting'
    # 等待1s
    e.wait(1)
    print 'wait_for_1:{0}'.format(str(e.is_set()))

def wait_for2(e):
    print 'wait_for_2: starting'
    # 等待5s 
    e.wait(5)
    print 'wait_for_2:{0}'.format(str(e.is_set()))
if __name__=='__main__':
    # 在进程之间传递状态
    e = multiprocessing.Event()

    e1 = multiprocessing.Process(name = 'p1', target = wait_for1, args = (e,))
    e2 = multiprocessing.Process(name = 'p2', target = wait_for2, args=(e,))
    
    e1.start()
    e2.start()

    # 等待3s 之后设置状态
    time.sleep(3)
    e.set()

    print 'Event Set Ok!'

Queue


  1. Queue.qsize() 返回队列的大小  
  2. Queue.empty() 如果队列为空,返回True,反之False  
  3. Queue.full() 如果队列满了,返回True,反之False 
  4. Queue.get([block[, timeout]]) 获取队列,timeout等待时间  
  5. Queue.get_nowait() 相当Queue.get(False) 
  6. 非阻塞 Queue.put(item) 写入队列,timeout等待时间  
  7. Queue.put_nowait(item) 相当Queue.put(item, False)

#!/usr/bin/env python
# coding=utf-8
import multiprocessing
import time

def writer_proc(q):
    try:
        for i in range(5):
            time.sleep(1)            
            q.put(i,timeout = 2)
            print 'put:',i
    except:
        pass
def reader_proc(q):
    try:
        for i in range(5):
            j = q.get(timeout = 2)
            print 'get:',j
    except:
        pass
if __name__=='__main__':
    q = multiprocessing.Queue()
    writer = multiprocessing.Process(target = writer_proc, args=(q,))
    writer.start()

    reader = multiprocessing.Process(target = reader_proc, args=(q,))
    reader.start()

    

Pool


# coding: utf-8
import multiprocessing
import os, time, random


def Lee():
    print "\nRun task Lee-%s" % (os.getpid())  # os.getpid()获取当前的进程的ID
    start = time.time()
    time.sleep(random.random() * 10)  # random.random()随机生成0-1之间的小数
    end = time.time()
    print 'Task Lee, runs %0.2f seconds.' % (end - start)


def Marlon():
    print "\nRun task Marlon-%s" % (os.getpid())
    start = time.time()
    time.sleep(random.random() * 40)
    end = time.time()
    print 'Task Marlon runs %0.2f seconds.' % (end - start)


def Allen():
    print "\nRun task Allen-%s" % (os.getpid())
    start = time.time()
    time.sleep(random.random() * 30)
    end = time.time()
    print 'Task Allen runs %0.2f seconds.' % (end - start)


def Frank():
    print "\nRun task Frank-%s" % (os.getpid())
    start = time.time()
    time.sleep(random.random() * 20)
    end = time.time()
    print 'Task Frank runs %0.2f seconds.' % (end - start)


if __name__ == '__main__':
    function_list = [Lee, Marlon, Allen, Frank]
    print "parent process %s" % (os.getpid())

    pool = multiprocessing.Pool(4)
    for func in function_list:
        # apply 线程阻塞,执行线程代码的时候,会阻塞主进程
        pool.apply_async(func)  # Pool执行函数,apply执行函数,当有一个进程执行完毕后,会添加一个新的进程到pool中

    print 'Waiting for all subprocesses done...'
    pool.close()
    pool.join()  # 调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束
    print 'All subprocesses done.'


Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐