Swift51.com
麦子学院 头像
麦子学院  2017-02-23 15:42

Tornado学习之协程详解

回复:0  查看:3213  
本文和大家分享的主要是 python开发中Tornado 协程相关问题,一起来看看吧,希望对大家学习和使用这部分内容有所帮助。
   什么是协程
  我们经常使用的函数又称子例程(subroutine) ,往往只有一个入口(函数的第一行),一个出口( return 、抛出异常)。子例程在入口时获得控制权,在出口时把控制权交还给调用者。一旦交还控制权,就意味着例程的结束,函数中做的所有工作以及保存在局部变量中的数据都将被释放。而协程可以有多个入口点,允许从一个入口点执行到下一个入口点之前暂停,保存执行状态;等到合适的时机恢复执行状态,从下一个入口点重新开始执行。
  "Subroutines are special cases of ... coroutines." –Donald Knuth.
  可以把协程看做是子例程的泛化形式。
  在Python 中,协程基于生成器。它可以有多个出口和入口。出口有两种类型:一种是 return ,用于永久交还控制权,效果同子例程;另一种是 yield ,用于暂时交还控制权,函数将来还会收回控制权。当然入口也有两种:一种是函数的第一行;另一种是上次 yield 的那一行。
  Tornado 中的协程
  Tornado 典型的协程例子:
  class GenAsyncHandler(RequestHandler): @gen.coroutine
  def get(self):
  http_client = AsyncHTTPClient()
  response = yield http_client.fetch("http://example.com")
  do_something_with_response(response)
  self.render("template.html")
  我们为这个Handler get() 添加了装饰器 gen.coroutine ,从而使其成为一个协程。不难理解,面对耗时的操作,利用协程可以达到异步的效果,需要等待的时候 yield 出去,等待结束后重新回来继续执行。如例子中 get 需要等待http_client.fetch("http://example.com")  的结果,因此通过yield 返回;当获取到结果后, get 函数从上次离开处继续运行,且 response 被赋值为 http_client.fetch("http://example.com")  的结果。
  这主要涉及到以下几样东西:
  1.Future
  Future 的设计目标是作为协程 (coroutine) IOLoop 的媒介,从而将协程和 IOLoop 关联起来。
  Future concurrent.py 中定义,是异步操作结果的占位符,用于等待结果返回。通常作为函数 IOLoop.add_future() 的参数或 gen.coroutine 协程中 yield 的返回值。
  等到结果返回时,外部可以通过调用set_result() 设置真正的结果,然后调用所有回调函数,恢复协程的执行。
  2.IOLoop
  IOLoop 是一个 I/O 事件循环,用于调度 socket 相关的连接、响应、异步读写等网络事件,并支持在事件循环中添加回调 (callback) 和定时回调 (timeout) 。在支持的平台上,默认使用 epoll 进行 I/O 多路复用处理。
  IOLoop Tornado 的核心,绝大部分模块都依赖于 IOLoop 的调度。在协程运行环境中, IOLoop 担任着协程调度器的角色,能够让暂停的协程重新获得控制权,从而能继续执行。
  IOLoop 通过 add_future() Future 的支持:
  def add_future(self, future, callback):
  assert is_future(future)
  callback = stack_context.wrap(callback)
  future.add_done_callback(lambda future: self.add_callback(callback, future))
  通过调用future add_done_callback() ,使当 future 在操作完成时,能够通过 add_callback callback 添加到 IOLoop 中,让 callback IOLoop 下一次迭代中执行 ( 不在本轮是为了避免饿死 )
3.Runner
  Runner coroutine 都在 gen.py 中定义。 Runner coroutine 装饰器创建,用于维护挂起的回调函数及结果的相关信息,包括中间结果 (future) 和最终结果 (result_future)
  4.coroutine
  gen.coroutine 是一个装饰器,负责将普通函数包装成协程。功能包括:
  · 调用函数,如果该函数有 yield ,则返回生成器。否则立即得到结果,不属于协程。
  · 通过 next() 执行生成器,如果未能结束 ( 遇到 yield) ,则生成中间类 Runner 对象,用于保存生成器、 yield 返回值和最终返回值。
  · 创建 Runner
  结合以上几样东西,协程的具体流程整理如下:
  每一次协程调用yield 释放控制权后:
  -> [Runner]handle_yield
  处理yield 返回的结果
  -> [Runner]ioloop.add_future(self.future, lambda f: self.run())
  将结果构造成future 后添加到ioloop
  -> [future]add_done_callback(lambda future: self.add_callback(callback, future))
  将Runner.run() 加入到完成时的回调函数列表中
  ??? 触发:
  -> [future]set_result
  已经得到future 的结果,设置之
  -> [future]_set_done
  调用future 所有回调函数 (_callbacks)
  -> [ioloop]add_callback(callback, future)
  callback [Runner]add_future 添加的那个,即 [Runner]self.run() ,将在下一轮循环被执行
  -> [Runner]self.run()
  取出Runner self.future( 上次yield 的返回值)
  1.  如果 future 未完成,return ,流程结束,等待下一次set_result
  2.  如果 future 完成
  -> [Runner]yielded = self.gen.send(value)
  通过send future result 发送给协程,并让其恢复执行:
  1.  如果协程结束 ( yield )
  -> [Runner]self.result_future.set_result
  设置最终的结果result_future
  2.  未结束 ( 再次遇到yield)
  -> [Runner]handle_yield
  则再次调用handle_yield
  从以上的流程可以看出,每一次协程调用yield 释放控制权后的恢复,都依赖于 set_result() 的调用。当我们把目光看向总调度 IOLoop 时,可以发现 IOLoop 只是忠实地在每一轮迭代中调用那些就绪的回调函数,并没有主动调用 set_result() 的能力。
  那么,在Tornado 中,这个 set_result() 到底是谁调用?在何时调用?
  搜遍了Tornado 的代码,主要找到以下几个调用点:
  1. coroutine 装饰器中,如果装饰的函数调用后直接结束 ( yield) ,直接 set_result()
  2. [Runner] run() 中,如果调用 send 后协程结束,对 result_future 进行 set_result()
3. 取决于 yield 后阻塞操作的具体实现,下面以例子中 AsyncHTTPClient fetch() 来进行分析。
  AsyncHTTPClient fetch() 是一个异步操作,其构造了一个 HTTP 请求,然后调用 fetch_impl() ,返回一个 future fetch_impl() 取决于 AsyncHTTPClient 的具体实现,默认情况下, AsyncHTTPClient 生成的是子类 SimpleAsyncHTTPClient 的实例,所以主要看 SimpleAsyncHTTPClient fetch_impl()
  def fetch_impl(self, request, callback):
  key = object()
  self.queue.append((key, request, callback))
  if not len(self.active) < self.max_clients:
  timeout_handle = self.io_loop.add_timeout(
  self.io_loop.time() + min(request.connect_timeout,
  request.request_timeout),
  functools.partial(self._on_timeout, key))
  else:
  timeout_handle = None
  self.waiting[key] = (request, callback, timeout_handle)
  self._process_queue()
  if self.queue:
  gen_log.debug("max_clients limit reached, request queued. "
  "%d active, %d queued requests." % (len(self.active), len(self.queue)))
  fetch_impl() 接受两个参数, request fetch() 中构造的 HTTP 请求, callback fetch 中的回调函数 handle_response
  def handle_response(response):
  if raise_error and response.error:
  future.set_exception(response.error)
  else:
  future.set_result(response)
  在handle_response() 中,调用了我们期待的 set_result() 。所以我们把目光转移到 fetch_impl() callback 。在 fetch_impl() 中,函数先将 callback 加到队列中,然后通过 _process_queue() 处理掉,处理时调用 _handle_request()
  def _handle_request(self, request, release_callback, final_callback):
  self._connection_class()(
  self.io_loop, self, request, release_callback,
  final_callback, self.max_buffer_size, self.tcp_client,
  self.max_header_size, self.max_body_size)
  这里构造了一个_connection_class 对象,即 HTTPConnection HTTPConnection 通过 self.tcp_client.connect() 来建立 TCP 连接,然后通过该连接发送 HTTP 请求, 在超时 (timeout) 或完成 (finish) 时调用 callback tcp_client 在建立异步 TCP 连接时,先进行 DNS 解析 ( 又是协程 ) ,然后建立 socket 来构造 IOStream 对象,最后调用 IOStream.connect() 。在 IOStream.connect() 的过程中,我们看到了关键操作:
  self.io_loop.add_handler(self.fileno(), self._handle_events, self._state)
  还记得我们前面说过的IOLoop 吗? IOLoop 可以添加 socket callback timeout ,并当它们就绪时调用相应的回调函数。这里 add_handler 处理的就是 socket 的多路复用,默认的实现是 epoll 。当 epoll 中该 socket 就绪时,相关函数得以回调。于是 tcp_client 读取 socket 内容获得 HTTP response handle_response() 被调用,最终 set_result() 被调用。
  到这里我们恍然大悟,AsyncHTTPClient set_result() 调用依赖于 IO 多路复用方案,这里是 epoll ,在 epoll 中相应 socket 的就绪的是 set_result() 得到调用的根本原因。而这个就绪事件的传递,离不开 Tornado 内建的 IOStream ,异步 TCPClient 、异步 HTTPConnection ,这些类的存在为我们隐藏了简单调用后的复杂性。因此当我们在用 yield 返回耗时操作时,如果不是 Tornado 的内建组件,则必须自己负责设计 set_result 的方案,比如以下代码:
  @gen.coroutinedef add(self, a, b):
  future = Future()
  def callback(a, b):
  print("calculating the sum of %d + %d:" % (a,b))
  future.set_result(a+b)
  tornado.ioloop.IOLoop.instance().add_callback(callback, a, b)
  result = yield future
  print("%d + %d = %d" % (a, b, result))
  通过手动将包含set_result() 的回调函数加到 IOLoop 中,于是回调下一次迭代中执行, set_result() 被调用,协程恢复控制权。
   总结
  实现高性能服务端,同步多进程、多线程风靡一时,然而由于其需要在内核态进行上下文切换,同步时还要加锁,导致并发性能低下。于是异步冒出来了,如Session 、状态机,当然还有本文讨论的协程。
  学习协程,主要的原因是在看SS 代码中深受状态机代码的折磨 ( 为啥要看 SS 的代码?你懂的 ) 。因为在状态机中,逻辑代码被分割成多块分散在 N 个回调里 ( 各种 on_xxxxx) ,割裂了人的顺序性思维,在阅读代码、理解逻辑时跳来跳去令人痛不欲生。反观协程,真正做到了同步编码异步执行。
  然而深挖协程的实现发现,协程的高性能和易编写易阅读是以后端框架复杂的封装为代价的。即使是在Python 中我们拥有能够维护上下文的生成器,为了实现协程的调度, Tornado 依然耗费了不少功夫:从定义 Future 对象用于等待结果返回,到使用 coroutine 装饰器将生成器的 yield 返回值封装成 Runner ,最后到 set_result Runner 重新跑起来,而这些都依赖于 IOLoop 的调度。在经过层层跳转后达成了协程的调度目的,不得不感慨 Tornado 设计的巧妙。
  不管怎么说,对于我等码农来说,协程代码写起来真的爽,读起来也也很爽,可以少死很多脑细胞,这就够了。

来源:binsite