Swift51.com
麦子学院 头像
麦子学院  2017-07-08 15:18

Python学习之分布式任务实现方法详解

回复:0  查看:2464  
本文和大家分享的主要是使用python 实现分布式任务相关内容,一起来看看吧,希望对大家 学习python有所帮助。
  深入读了读python 的官方文档,发觉 Python 自带的 multiprocessing 模块有很多预制的接口可以方便的实现多个主机之间的通讯,进而实现典型的生产者 - 消费者模式的分布式任务架构。
  之前,为了在Python 中实现生产者 - 消费者模式,往往就会选择一个额外的队列系统,比如 rabbitMQ 之类。此外,你有可能还要设计一套任务对象的序列化方式以便塞入队列。如果没有队列的支持,那不排除有些同学不得不从 socket 服务器做起,直接跟 TCP/IP 打起交道来。
  其实multiprocessing.managers 中有个 BaseManager 就为开发者提供了这样一个快速接口。
  我们假定的场景是1 个生产者( producer.py +8 个消费者 (worker.py) 的系统,还有一个中央节点负责协调( server.py )实现如下:
  server.py
   from multiprocessing.managers  import BaseManager import Queue
  queue = Queue.Queue() # 初始化一个 Q ,用于消息传递 class  QueueManager(BaseManager):
   pass
  QueueManager.register('get_queue', callable= lambda:queue) #  在系统中发布 get_queue 这个业务
   if __name__ == '__main__':
  m = QueueManager(address=('10.239.85.193', 50000),authkey='abr' )
  监听所有 10.239.85.193 50000
  s = m.get_server()
  s.serve_forever()
  worker.py
   from multiprocessing.managers  import BaseManager from multiprocessing  import Pool
   class  QueueManager(BaseManager):
   pass
  QueueManager.register('get_queue')
   def  feb(i): # 经典的 ' 山羊增殖 '
   if i < 2:  return 1
   if i < 5 :  return feb(i-1) + feb(i-2)
   return feb(i-1) + feb(i-2) - feb(i-5)
   def  worker(i):
  m = QueueManager(address=('10.239.85.193', 50000), authkey='abr')# 连接 server
  m.connect()
   while  True:
  queue = m.get_queue()#  获取 Q
  c = queue.get()
   print feb(c)
   if __name__ == '__main__':
  p = Pool(8) #  分进程启动 8 worker
  p.map(worker, range(8))
  producer.py
   from multiprocessing.managers  import BaseManager
   class  QueueManager(BaseManager):
   pass
  QueueManager.register('get_queue')
   if __name__ == '__main__':
  m = QueueManager(address=('10.239.85.193', 50000), authkey='abr')
  m.connect()
  i = 0
   while  True:
  queue = m.get_queue()
  queue.put(48)
  i+=1
  系统会直接将Queue()  对象中的数据直接封装后通过 TCP 50000 端口在主机之间传递。不过需要注意的是,由于 authkey 的缘故,各个节点要求 python 的版本一致。
来源:  开源小站