Swift51.com
麦子学院 头像
麦子学院  2017-04-19 17:35

Python学习之多进程变成

回复:0  查看:2244  

之前的Python编程任务中一般都是单线程进行的,因为大多数属于CPU密集型,多线程或多进程对程序性能的提升不大;但最近有几个网络密集型的任务,单线程的版本明显太慢,不得已,开始学习Python的多进程编程。

  正文:

  参考解答:

  1.创建进程池Pool

  multiprocessing 是 Python 的多进程并行库,我使用进程池 multiprocessing.Pool 来自动管理进程任务。可以通过一下语句初始化 Pool

  multiprocessing.freeze_support() # Windows 平台要加上这句,避免 RuntimeError

  pool = multiprocessing.Pool()

  假设我们要并行执行的任务是以下函数:

  def task(pid):

   # do something

   return result

  然后在主函数调用:

  results = []

  for i in xrange( 0 , 4 ):

   result = pool.apply_async(task, args=(i,)) #这行i后面的逗号是不能省略的,否则不会执行task方法

   results.append(result)

  上面的 pool.apply_async 采用异步方式调用 task,而 pool.apply 则是同步方式调用。同步方式意味着下一个 task需要等待上一个 task 完成后才能开始运行,这显然不是我们想要的功能,所以采用异步方式连续地提交任务。在上面的语句中,我们提交了 个任务,假设我的 CPU 是 核,那么我的每个核运行一个任务。如果我提交多于 个任务,那么每个核就需要同时运行 个以上的任务,这会带来任务切换成本,降低了效率(Pool进程池可以提供指定数量的进程供用户调用,当有新的请求提交到Pool中时,如果进程池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来处理它)。所以我们设置的并行任务数最好等于 CPU 核心数, CPU 核可以通过下面语句得到:

  cpus = multiprocessing.cpu_count()

  接下来我们使用 result.get() 来获取 task 的返回值:

  for result in results:

   print(result.get())

  在这里不免有人要疑问,为什么不直接在 for 循环中直接 result.get() 呢?这是因为pool.apply_async之后的语句都是阻塞执行的,调用 result.get() 会等待上一个任务执行完之后才会分配下一个任务。事实上,获取返回值的过程最好放在进程池回收之后进行,避免阻塞后面的语句。

  最后我们使用一下语句回收进程池:

  pool.close() #调用join函数之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到Pool

  pool.join()  #调用join函数阻塞主进程等待所有子进程结束

  最后附上完整的代码如下:

  def task(pid):

   # do something

   return result

  def main():

   multiprocessing.freeze_support()

   pool = multiprocessing.Pool()

   cpus = multiprocessing.cpu_count()

   results = []

   for i in xrange( 0 , cpus):

     result = pool.apply_async(task, args=(i,))

     results.append(result)

   pool.close()

   pool.join()

   for result in results:

     print(result.get())

  2、使用进程池Pool,不需要关注结果

  import multiprocessing

  import time

  #只是简单的printsleep,并不需要返回什么结果/内容

  def func(msg):

   for i in xrange( 3 ):

     print msg

     time.sleep( 1 )

  if __name__ == "__main__" :

   pool = multiprocessing.Pool(processes= 4 )

   for i in xrange( 10 ):

     msg = "hello %d" %(i)

     pool.apply_async(func, (msg, ))

   pool.close()

   pool.join()

   print "Sub-process(es) done."

  3、使用进程池Pool,并需要关注结果(通过callback的方式)

  import multiprocessing as mp

  import time

  def foo_pool(x):

   time.sleep( 2 )

   return x*x

  result_list = []

  def log_result(result):

   # This is called whenever foo_pool(i) returns a result.

   # result_list is modified only by the main process, not the pool workers.

   result_list.append(result)

  def apply_async_with_callback():

   pool = mp.Pool()

   for i in range( 10 ):

     pool.apply_async(foo_pool, args = (i, ), callback = log_result)

   pool.close()

   pool.join()

   print(result_list)

  if __name__ == '__main__' :

   apply_async_with_callback()

 

来源:ASPIRE