-
Notifications
You must be signed in to change notification settings - Fork 0
coroutines
wzs edited this page Aug 27, 2018
·
1 revision
- 协程, 又称微线程, 纤程. 英文名Coroutine.
- 协程如 lambda表达式/匿名函数 一样, 是一种编程思想/技巧
- 什么是协程
-
子程序, 或者称为函数, 在所有语言中都是层级调用, 比如A调用B, B在执行过程中又调用了C, C执行完毕返回, B执行完毕返回, 最后是A执行完毕. 所以子程序调用是通过栈实现的, 一个线程就是执行一个子程序.
- 子程序调用总是一个入口, 一次返回, 调用顺序是明确的.
-
协程看上去也是子程序, 但执行过程中, 在子程序内部可中断, 然后转而执行别的子程序, 在适当的时候再返回来接着执行.
- 注意, 在一个子程序中中断, 去执行其他子程序, 不是函数调用, 有点类似CPU的中断
- 子程序可以看成协程的一种: 即没有内部中断的协程
- 和迭代器的区别: 迭代器每次调用
next()执行的都是同一段代码逻辑, 但是协程可以执行不同的代码逻辑
-
子程序, 或者称为函数, 在所有语言中都是层级调用, 比如A调用B, B在执行过程中又调用了C, C执行完毕返回, B执行完毕返回, 最后是A执行完毕. 所以子程序调用是通过栈实现的, 一个线程就是执行一个子程序.
- 在任何时刻, 只有一个协程在运行, 而 multiprocessing或threading 轮转使用操作系统调度的进程和线程, 是真正的并行
- 与多线程相比, 协程的优点在于
- 避免了线程创建/切换带来的消耗
- 不需要多线程的锁机制
- 与多线程相比, 协程的优点在于
- 迭代器函数: 结合代码更容易理解 示例
-
迭代器表达式
- 示例:
('/user/{page}/'.format(page=page) for page in range(1, 4) - 结果: 元素为
('/user/1/', '/user/2/', '/user/3/')的迭代器表达式
- 示例:
- 迭代器函数: 函数中如果出现了yield关键字, 那么该函数是迭代器函数
-
迭代器表达式
- 迭代器函数在一定程度上实现了协程机制
- 通过调用
send(msg)执行迭代器函数 且当遇到 yield关键字 时中断执行, 返回主程序执行. - 再次调用
send(msg)则会从上次中断的位置继续执行, 直到再次遇到 yield关键字, 返回主程序执行
- 通过调用
-
coroutine.send(msg): 将msg传入给 迭代器函数, 返回结果是 yield关键字 的参数coroutine.next() == coroutine.send(None)- 示例
- 迭代器函数:
d = yield "hello" - 主函数执行
result = coroutine.send(3): result值为hello. 协程再次执行(再次调用send())时, d的值为3
- 迭代器函数:
-
coroutine.close(): 手动关闭迭代器函数, 后面的调用会直接返回StopIteration异常. - 当 迭代器函数 执行到结束依然没有遇到中断时, 会抛出 StopIteration 异常. 解决方案如下:
- for循环: for循环会检测异常且自动调用
next() - 使用
try/except捕获异常
- for循环: for循环会检测异常且自动调用
- 不能使用
yield return:yield args返回 args, 而 return 是关键字, 不能作为参数返回
- 协程常用于 传统的生产者/消费者模型. 使用yield跳转到消费者执行消费,执行完毕后再切回生产者
import time
def consumer():
r = ''
while True:
n = yield r
print('[CONSUMER] Consuming %s...' % n)
r = '200 OK'
def produce(c):
c.next()
n = 0
while n < 5:
n = n + 1
print('[PRODUCER] Producing %s...' % n)
r = c.send(n)
print('[PRODUCER] Consumer return: %s' % r)
c.close()
if __name__=='__main__':
c = consumer()
produce(c)详细参考示例_IBM
- 需求: 不借助全局变量的情况下, 在需要时才获取数列的下一个值
迭代器版本
class Fab(object):
def __init__(self, max):
self.max = max
self.n, self.a, self.b = 0, 0, 1
def __iter__(self):
return self
def next(self):
if self.n < self.max:
r = self.b
self.a, self.b = self.b, self.a + self.b
self.n = self.n + 1
return r
raise StopIteration()
for n in Fab(5):
print n yield版本
def fab(max):
n, a, b = 0, 0, 1
while n < max:
yield b
# print b
a, b = b, a + b
n = n + 1
for n in fab(5):
print n - 如果直接对文件对象调用
read()/readline()方法, 会导致不可预测的内存占用. 通过固定长度的缓冲区来不断读取文件内容可以是程序更稳定
def read_file(fpath):
BLOCK_SIZE = 1024
with open(fpath, 'rb') as f:
while True:
block = f.read(BLOCK_SIZE)
if block:
yield block
else:
return参考: gevent_廖雪峰
参考: gevent程序员指南
- gevent: gevent通过greenlet为Python实现了比较完善的协程支持
- greenlet: 以C扩展模块形式接入Python的轻量级协程. Greenlet全部运行在主程序操作系统进程的内部, 但它们被协作式地调度
- 基本思想: 当一个greenlet遇到IO操作时, 如访问网络, 就自动切换到其他的greenlet, 等到IO操作完成, 再在适当的时候切换回来继续执行. 由于IO操作非常耗时, 经常使程序处于等待状态, 有了gevent为我们自动切换协程, 就保证总有greenlet在运行, 而不是等待IO
- 因为切换是在IO操作时自动完成, 所以gevent需要修改Python自带的一些标准库, 这一过程在启动时通过monkey patch完成
- 实现
- Gevent处理了所有的细节, 来保证你的网络库会在可能的时候(受限于网络或IO), 隐式交出greenlet上下文的执行权
- 协程停止: 当主函数收到 SIGQUIT 信号时, 没有成功yield的 Greenlet 可能会挂起程序的执行(这会导致僵尸进程的产生, 需要在Python解释器之外被kill)
- 解决方法: 在主进程中监听信号
- 监听代码:
gevent.signal(signal.SIGQUIT, gevent.shutdown) - 示例
- 监听代码:
- 解决方法: 在主进程中监听信号
-
monkey.patch_all(): 用于修改标准socket库中的阻塞式系统调用,使其成为协作式运行, 即修改标准库里的部分函数,使其支持gevent协程方式运行.
| 状态 | 类型 | 介绍 |
|---|---|---|
started |
Boolean | 指示此Greenlet是否已经启动 |
ready() |
Boolean | 指示此Greenlet是否已经停止 |
successful() |
Boolean | 指示此Greenlet是否已经停止而且没抛异常 |
value |
任意值 | 此Greenlet代码返回的值 |
exception |
异常 | 此Greenlet内抛出的未捕获异常 |
- 使用样例
woker1 = gevent.spawn(worker)
# 获取Greenlet状态: 是否启动
state = woker1.started
state = woker1.successful()
result = worker1.value# coding:utf-8
import urllib2
import gevent
import signal
from gevent import monkey; monkey.patch_all()
def worker(url):
resp = urllib2.urlopen(url)
print resp.url
resp.close()
if __name__=='__main__':
# 监听信号, 当主进程停止时结束协程(部分版本没有shutdown函数)
# gevent.signal(signal.SIGQUIT, gevent.shutdown)
# 设置超时
timeout = gevent.Timeout(3)
timeout.start()
try:
# joinall: 阻塞当前流程, 执行所有给定的greenlet, 执行流程只会在 所有greenlet执行完后才会继续向下走
gevent.joinall([
# spawn: 将worker函数封装到Greenlet内部线程
gevent.spawn(worker,"http://www."+domain+".com") for domain in ["xiagaoxiawan","zhihu","google"]
])
except gevent.Timeout:
print("timeout!!")import gevent
from gevent import Greenlet
class MyGreenlet(Greenlet):
def __init__(self, message, n):
Greenlet.__init__(self)
self.message = message
self.n = n
def _run(self):
print(self.message)
gevent.sleep(self.n)
g = MyGreenlet("Hi there!", 3)
g.start()
g.join()事件: 事件是一种 订阅/消费 的模型, 理解 gevent 的事件需要先了解什么是事件
- 事件(event): 用于Greenlet之间的异步通信
- event 采用
wait()订阅事件,等待消息; 使用set()发布事件 - 事件定义:
evt = gevent.event.Event() - 订阅事件:
evt.wait(); 发布事件:evt.set()
- event 采用
- AsyncResult: 事件的一种扩展, 允许程序在唤醒调用上附加一个值. 具体参考例子
- 有时也被称作是future或defered, 因为它持有一个指向将来任意时间可设置 为任何值的引用
- 当从
AsyncResult对象中读取数据时, 若还没有被赋值,则该函数陷入阻塞, 协程切换到其他函数执行.
# coding:utf-8
import gevent
from gevent.event import AsyncResult
# 创建AsyncResult对象
a = AsyncResult()
def setter():
gevent.sleep(3)
# 发送事件
a.set('Hello!')
def waiter():
# 监听事件. 当 AsyncResult 没有被赋值时, waiter函数执行被中断, 切换到其它函数执行
print(a.get())
def worker3():
# 测试函数, 当AsyncResult.get() 被阻塞时检测是否会调用本函数
print "worker3"
gevent.joinall([
gevent.spawn(waiter),
gevent.spawn(setter),
gevent.spawn(worker3),
])等以上部分掌握的差不多了, 后续部分再整理
- WSGI: Web Server Gateway Interface
- 使用
gevent.pywsgi配合 Flask 实现 web server
from flask import Flask
import gevent.pywsgi
import gevent
app = Flask(__name__)
@app.route("/")
def handle():
return "<h1>Hellp Flask</h1>"
gevent_server = gevent.pywsgi.WSGIServer(('',8000),app)
gevent_server.serve_forever()