写在前面
- 本文默认读者对 Python 生成器 有一定的了解,不了解者请移步至生成器 – 廖雪峰的官方网站。
- 本文基于 Python 3.5.1,文中所有的例子都可在 Github 上获得。
学过 Python 的都知道,Python 里有一个很厉害的概念叫做 生成器(Generators)。一个生成器就像是一个微小的线程,可以随处暂停,也可以随时恢复执行,还可以和代码块外部进行数据交换。恰当使用生成器,可以极大地简化代码逻辑。
也许,你可以熟练地使用生成器完成一些看似不可能的任务,如“无穷斐波那契数列”,并引以为豪,认为所谓的生成器也不过如此——那我可要告诉你:这些都太小儿科了,下面我所要介绍的绝对会让你大开眼界。
生成器 可以实现 协程,你相信吗?
什么是协程
在异步编程盛行的今天,也许你已经对 协程(coroutines) 早有耳闻,但却不一定了解它。我们先来看看 Wikipedia 的定义:
Coroutines are computer program components that generalize subroutines for nonpreemptive multitasking, by allowing multiple entry points for suspending and resuming execution at certain locations.
也就是说:协程是一种 允许在特定位置暂停或恢复的子程序——这一点和 生成器 相似。但和 生成器 不同的是,协程 可以控制子程序暂停之后代码的走向,而 生成器 仅能被动地将控制权交还给调用者。
协程 是一种很实用的技术。和 多进程 与 多线程 相比,协程 可以只利用一个线程更加轻便地实现 多任务,将任务切换的开销降至最低。和 回调 等其他异步技术相比,协程 维持了正常的代码流程,在保证代码可读性的同时最大化地利用了 阻塞 IO 的空闲时间。它的高效与简洁赢得了开发者们的拥戴。
Python 中的协程
早先 Python 是没有原生协程支持的,因此在 协程 这个领域出现了百家争鸣的现象。主流的实现由以下两种:
- 用 C 实现协程调度。这一派以 gevent 为代表,在底层实现了协程调度,并将大部分的 阻塞 IO 重写为异步。
- 用 生成器模拟。这一派以 Tornado 为代表。Tornado 是一个老牌的异步 Web 框架,涵盖了五花八门的异步编程方式,其中包括 协程。本文部分代码借鉴于 Tornado。
直至 Python 3.4,Python 第一次将异步编程纳入标准库中(参见 PEP 3156),其中包括了用生成器模拟的 协程。而在 Python 3.5 中,Guido 总算在语法层面上实现了 协程(参见 PEP 0492)。比起 yield
关键字,新关键字 async
和 await
具有更好的可读性。在不久的将来,新的实现将会慢慢统一混乱已久的协程领域。
尽管 生成器协程 已成为了过去时,但它曾经的辉煌却不可磨灭。下面,让我们一起来探索其中的魔法。
一个简单的例子
假设有两个子程序 main
和 printer
。printer
是一个死循环,等待输入、加工并输出结果。main
作为主程序,不时地向 printer
发送数据。
这应该怎么实现呢?
传统方式中,这几乎不可能在一个线程中实现,因为死循环会阻塞。而协程却能很好地解决这个问题:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
def printer(): counter = 0 while True: string = (yield) print('[{0}] {1}'.format(counter, string)) counter += 1 if __name__ == '__main__': p = printer() next(p) p.send('Hi') p.send('My name is hsfzxjy.') p.send('Bye!') |
输出:
1 2 3 |
[0] Hi [1] My name is hsfzxjy. [2] Bye! |
这其实就是最简单的协程。程序由两个分支组成。主程序通过 send
唤起子程序并传入数据,子程序处理完后,用 yield
将自己挂起,并返回主程序,如此交替进行。
协程调度
有时,你的手头上会有多个任务,每个任务耗时很长,而你又不想同步处理,而是希望能像多线程一样交替执行。这时,你就需要一个调度器来协调流程了。
作为例子,我们假设有这么一个任务:
1 2 3 4 |
def task(name, times): for i in range(times): print(name, i) |
如果你直接执行 task
,那它会在遍历 times
次之后才会返回。为了实现我们的目的,我们需要将 task
人为地切割成若干块,以便并行处理:
1 2 3 4 5 |
def task(name, times): for i in range(times): yield print(name, i) |
这里的 yield
没有逻辑意义,仅是作为暂停的标志点。程序流可以在此暂停,也可以在此恢复。而通过实现一个调度器,我们可以完成多个任务的并行处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
from collections import deque class Runner(object): def __init__(self, tasks): self.tasks = deque(tasks) def next(self): return self.tasks.pop() def run(self): while len(self.tasks): task = self.next() try: next(task) except StopIteration: pass else: self.tasks.appendleft(task) |
这里我们用一个队列(deque)储存任务列表。其中的 run
是一个重要的方法: 它通过轮转队列依次唤起任务,并将已经完成的任务清出队列,简洁地模拟了任务调度的过程。
而现在,我们只需调用: