python多进程编程

  Python由于全局锁GIL的存在,无法享受多线程带来的性能提升。multiprocessing包采用子进程的技术避开了GIL,使用multiprocessing可以进行多进程编程提高程序效率。

multiprocessing.Process对象

from multiprocessing import Process

#定义子进程处理函数
def worker(a,b,c=3,d=6):
    pass

p = Process(target=worker,name='run_worker',args=(1,2),kwargs={d:'4'})
p.daemon = True        #设置子进程是否随父进程终止而自动终止,一定要在start方法调用之前设置
p.start()
p.join()

  Process对象的初始化参数为Process(group=None, target=None, name=None, args=(), kwargs={}),其中group参数必须为None(为了与threading.Thread的兼容),target指向可调用对象(该对象在新的子进程中运行),name是为该子进程命的名字(默认是Proess-1,Process-2, …这样),args是被调用对象的位置参数的元组列表,kwargs是被调用对象的关键字参数。

  子进程终结时会通知父进程并清空自己所占据的内存,在内核里留下退出信息(exit code,如果顺利运行,为0;如果有错误或异常状况,为大于零的整数)。父进程得知子进程终结后,需要对子进程使用wait系统调用,wait函数会从内核中取出子进程的退出信息,并清空该信息在内核中占据的空间。

  如果父进程早于子进程终结,子进程变成孤儿进程,孤儿进程会被过继给init进程,init进程就成了该子进程的父进程,由init进程负责该子进程终结时调用wait函数。如果父进程不对子进程调用wait函数,子进程成为僵尸进程。僵尸进程积累时,会消耗大量内存空间。 可以设置子进程的daemon属性为True,则父进程终结时,自动终止该子进程。

  如果在父进程中不调用子进程的p.join()方法,则主进程与父进程并行工作。join方法的作用主要是(以下线程均可换为子进程):

  • 阻塞主进程(挡住,无法执行join以后的语句),专注执行多线程。
  • 多线程多join的情况下,依次执行各线程的join方法,前头一个结束了才能执行后面一个。
  • 无参数时,则等待到该线程结束,才开始执行下一个线程的join。
  • 设置参数后,则等待该线程这么长时间如果没有执行完就阻塞该线程。

    将进程定义为类

    import multiprocessing
    import time
    class ClockProcess(multiprocessing.Process):
        def __init__(self, interval):
            multiprocessing.Process.__init__(self)
            self.interval = interval
        def run(self):
            n = 5
            while n > 0:
                print("the time is {0}".format(time.ctime()))
                time.sleep(self.interval)
                n -= 1
        if __name__ == '__main__':
            p = ClockProcess(3)
            p.start()
    

multiprocessing.Queue对象

使用Queue对象可以实现进程间通信,并且Queue对象是线程及进程安全的:

from multiprocessing import Queue, Process
def worker(q):
    q.put(['abc',123,'x'])

if __name__ == "__main__":
    q = Queue()
    p = Process(target = worker,args=(q,))
    p.daemon = True
    p.start()
    p.join()
    print q.get()

multiprocessing.Pipe对象

Pipe对象返回的元组分别代表管道的两端p[0]和p[1],管道默认是全双工,两端都支持send和recv方法,两个进程分别操作管道两端时不会有冲突,两个进程对管道一端同时读写时可能会有冲突:

from multiprocessing import Pipe, Process
def worker(p):
    p.send('hello,world!')

if __name__ == "__main__":
    left,right = Pipe()
    p = Process(target=worker,args=(left,))
    p.daemon = True
    p.start()
    p.join()
    print right.recv()

如果声明了p = Pipe(duplex=False)的单向管道,则p[0]只负责接受消息,p[1]只负责发送消息。

multiprocessing.Lock对象

当多个进程需要访问共享资源的时候,Lock可以用来避免访问的冲突。
Lock有两种方法使用:1.使用with上下文管理器;2.使用acquire()和release()方法

import multiprocessing
import sys

def worker_with(lock, f):
    with lock:
        fs = open(f, 'a+')
        n = 10
        while n > 1:
            fs.write("Lockd acquired via with\n")
            n -= 1
        fs.close()

def worker_no_with(lock, f):
    lock.acquire()
    try:
        fs = open(f, 'a+')
        n = 10
        while n > 1:
            fs.write("Lock acquired directly\n")
            n -= 1
        fs.close()
    finally:
        lock.release()

if __name__ == "__main__":
    lock = multiprocessing.Lock()
    f = "file.txt"
    w = multiprocessing.Process(target = worker_with, args=(lock, f))
    nw = multiprocessing.Process(target = worker_no_with, args=(lock, f))
    w.start()
    nw.start()
    print "end"

multiprocessing.Value 和 multiprocessing.Array对象

在进程间共享状态可以使用multiprocessing.Value和multiprocessing.Array这样特殊的共享内存对象:
  Value(typecode_to_type , obj)
  Array(typecode_to_type , int | list | tuple... , lock=True)
Array的第二个参数传入为int类型的一个数的时候,会初始化值为0长度为这个值的数组

typecode_to_type = {
    'c': ctypes.c_char,
    'b': ctypes.c_byte,  'B': ctypes.c_ubyte,
    'h': ctypes.c_short, 'H': ctypes.c_ushort,
    'i': ctypes.c_int,   'I': ctypes.c_uint,
    'l': ctypes.c_long,  'L': ctypes.c_ulong,
    'f': ctypes.c_float, 'd': ctypes.c_double
    }

读写Value数据用属性v.value,读写Array数据用切片运算a[i]

from multiprocessing import Pipe, Process,Value,Array
def worker(v,a):
    v.value = 1.2
    for i in range(10):
        a[i] = -i

if __name__ == "__main__":
    v = Value('f',0.0)
    a = Array('i',range(10))
    print v.value
    print a[3]
    p = Process(target=worker,args=(v,a))
    p.daemon = True
    p.start()
    p.join()
    print v.value
    print a[3]

multiprocessing.Manager对象

multiprocessing.Manager对象创建一个服务进程,像是一个保存状态的代理,其他进程通过与代理的接口通信取得状态信息,服务进程支持更多的数据类型,使用起来比共享内存更灵活。

from multiprocessing import Process, Manager

def func(d, l):
    d['1'] = 2
    d[2] = 'str'
    d[3.0] = None
    for i in range(len(l)):
        l[i] = -i

if __name__ == "__main__":
    m = Manager()
    l = m.list(range(10))
    d = m.dict()
    p = Process(target=func, args=(d, l,))
    p.start()
    p.join()

    print d
    print l

multiprocessing.Pool对象

  在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。
  Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来执行它。

#coding: utf-8
import multiprocessing
import time

def func(msg):
    print "msg:", msg
    time.sleep(3)
    print "end"

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes = 3)
    for i in xrange(4):
        msg = "hello %d" %(i)
        pool.apply_async(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去

    print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"
    pool.close()
    pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
    print "Sub-process(es) done."  

函数解释:

  • apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的(理解区别,看例1例2结果区别)
  • close() 关闭pool,使其不在接受新的任务
  • terminate() 结束工作进程,不在处理未完成的任务。
  • join() 主进程阻塞,等待子进程的退出, join方法要在close或terminate之后使用。

执行说明:
  创建一个进程池pool,并设定进程的数量为3,xrange(4)会相继产生四个对象[0, 1, 2, 4],四个对象被提交到pool中,因pool指定进程数为3,所以0、1、2会直接送到进程中执行,当其中一个执行完事后才空出一个进程处理对象3,所以会出现输出“msg: hello 3”出现在”end”后。因为为非阻塞,主函数会自己执行自个的,不搭理进程的执行,所以运行完for循环后直接输出“mMsg: hark~ Mark~ Mark~~~~~~”,主程序在pool.join()处等待各个进程的结束。

multiprocessing.Semaphore对象

Semaphore用来控制对共享资源的访问数量,例如池的最大连接数。当连接数量达到设定值后,只有当旧的连接释放掉,才会为资源创建新连接。

import multiprocessing
import time

def worker(s, i):
    s.acquire()
    print(multiprocessing.current_process().name + "acquire");
    time.sleep(i)
    print(multiprocessing.current_process().name + "release\n");
    s.release()

if __name__ == "__main__":
    s = multiprocessing.Semaphore(2)
    for i in range(5):
        p = multiprocessing.Process(target = worker, args=(s, i*2))
        p.start()

multiprocessing.Event对象

Event用来实现进程间同步通信。

import multiprocessing
import time

def wait_for_event(e):
    print("wait_for_event: starting")
    e.wait()
    print("wairt_for_event: e.is_set()->" + str(e.is_set()))

def wait_for_event_timeout(e, t):
    print("wait_for_event_timeout:starting")
    e.wait(t)
    print("wait_for_event_timeout:e.is_set->" + str(e.is_set()))

if __name__ == "__main__":
    e = multiprocessing.Event()
    w1 = multiprocessing.Process(name = "block",
            target = wait_for_event,
            args = (e,))

    w2 = multiprocessing.Process(name = "non-block",
            target = wait_for_event_timeout,
            args = (e, 2))
    w1.start()
    w2.start()

    time.sleep(3)

    e.set()
    print("main: event is set")