本章主要讲解协程问题,从迭代器开始逐渐过渡到协程的概念和用法。
迭代器
- 可迭代:可直接被for循环使用的对象叫可迭代的(Iterable),注意这个是个形容词,用来形容一个对象是否可以迭代。
- 迭代器:不但可以作用于for循环,还可以被next调用的,叫迭代器(Iterator),这是一个名词。
通过定义我们知道,迭代器都是可迭代的,可迭代的未必是迭代器。
我们常见的range函数就是可迭代的,我们也经常用,还有比如list对象等。
- 可以用
isinstance判断是否可迭代和是否是迭代器,参看下面代码:
from collections import Iterable
l = [1, 2, 3, 4]
isinstance(l, Iterable)
from collections import Iterator
isinstance((x for x in range(10)), Iterator)
- iterable和Iterator的关系,可以通过iter函数运算,简而言之就是把一个对象转换成一个迭代器
isinstance(iter('abc'), Iterator)
for循环原理- 先判断对象是否为可迭代对象
- 不是的话直接报错,抛出TypeError异常
- 是的话则调用
iter方法,返回一个迭代器
- 不断地调用迭代器的next方法,每次按序返回迭代器中的一个值
- 迭代到最后,没有更多元素了就抛出异常
StopIteration,这个异常python自己会处理,不会暴露给开发者
- 先判断对象是否为可迭代对象
- 看个小代码:
# for循环原理
x = [1, 2, 3]
x_iterator = x.__iter__()
try:
while True:
print(next(x_iterator)) # .__next__())
except StopIteration:
print("STOP")
生成器
生成器是特指一种一边循环一边计算的机制的实现(generator),是一种特定的迭代器。
它是一个算法/函数,这个函数可以返回多个返回值,想得到这个函数需要用next函数调用这个生成器,每次调用next函数的时候生成器会计算下一个值,利用yield返回一个值,这样如果有多个yield这个函数就能返回不同的值。
最后一次被调动,没有值返回的时候则抛出StopIteration异常。
生成器可以直接创建,如下面案例:
L = [x * x for x in range(10)] # 生成列表
g = (x * x for x in range(10)) # 得到一个生成器
第一个L是用列表生成式创建的列表,而第二个则是创建了一个生成器(generator),而不是一个元祖生成器,因为世上根本没有元祖生成器,用的人多了,也没有!
可以粗暴的理解成函数中包含yield叫generator,生成器可以被next调用,此时遇到函数中的yield则返回,下次被next调用的时候则继续上次返回的点执行,这样一个函数可以被多次调用,每次调用执行一段代码。
生成器案例1:
def odd():
print('step 1')
yield 1
print('step 2')
yield(3)
print('step 3')
yield(5)
if __name__ == '__main__':
try:
g1 = odd()
while True:
a = next(g1)
print(a)
except StopIteration:
print("odd DONE")
需要注意生成器的使用方法,我先调用了下生成器,得到一个生成器变量g1后,此时生成器函数odd()还没真正运行,我需要调用next,每调用一次生成器会运行一段代码,到yield后就停止,此时返回值就是yield后面的值,一直运行下去,直到最后发出StopIteration异常。
案例1的生成器运行结果如下:
step 1
1
step 2
3
step 3
5
odd DONE
利用生成器解决斐波那契数列问题:
# coding=utf-8
# 利用循环解决斐波那契问题
def fib_loop(max):
n, a, b = 0, 0, 1
while n < max:
print(b)
a, b = b, a + b
n = n + 1
return 'done'
# 利用生成器
def fib_g(max):
n, a, b = 0, 0, 1
while n < max:
yield b
a, b = b, a + b
n = n + 1
if __name__ == '__main__':
print("循环的斐波那契:")
fib_loop(5)
print("生成器的斐波那契:")
g = fib_g(5)
try:
while True:
a = next(g)
print(a)
except StopIteration:
print("STOP")
协程
- 历史
- 3.4引入协程概念,用yield实现
- 3.5 引入协程语法
- 实现包asyncio, tornado, gevent
- 定义:”协程是为非抢占式多任务产生子程序的计算机程序组件,
- 协程允许不同入口点在不同位置暂停或开始执行程序”。
- 从技术的角度来说,”协程就是你可以暂停执行的函数”。
- 如果你把它理解成”就像生成器一样”,那么你就想对了。
- 使用:
- yield关键字
- send关键字
# coding=utf-8
def simple_coroutine():
print('-> start')
x = yield
print('-> received', x)
sc = simple_coroutine()
print(1111)
# 可以使用sc.send(None),效果一样
next(sc)
print(2222)
sc.send('zhexiao')
- 协程的四个状态
- inspect.getgeneratorstate(…) 函数确定,该函数会返回下述字符串中的一个:
- GEN_CREATED:等待开始执行
- GEN_RUNNING:解释器正在执行
- GEN_SUSPENDED:在yield表达式处暂停
- GEN_CLOSED:执行结束
- next预激(prime)
- 完整执行过程
- inspect.getgeneratorstate(…) 函数确定,该函数会返回下述字符串中的一个:
def simple_coroutine(a):
print('-> start')
b = yield a
print('-> received', a, b)
c = yield a + b
print('-> received', a, b, c)
# run
sc = simple_coroutine(5)
aa = next(sc)
print(aa)
bb = sc.send(6) # 5, 6
print(bb)
cc = sc.send(7) # 5, 6, 7
print(cc)
- 协程终止
- 协程中未处理的异常会向上冒泡,传给 next 函数或 send 方法的调用方(即触发协程的对象)。
- 终止协程的一种方式:发送某个哨符值,让协程退出。内置的 None 和Ellipsis 等常量经常用作哨符值。
- 异常
- 客户端代码可以在生成器对象上调用两个方法
- generator.throw(Exception):
- 致使生成器在暂停的 yield 表达式处抛出指定的异常。如果生成器处理了抛出的异常,代码会向前执行到下一个 yield 表达式,而产出的值会成为调用 generator.throw方法得到的返回值。如果生成器没有处理抛出的异常,异常会向上冒泡,传到调用方的上下文中。
- generator.close()
- 致使生成器在暂停的 yield 表达式处抛出 GeneratorExit 异常。
- 如果生成器没有处理这个异常,或者抛出了 StopIteration 异常(通常是指运行到结尾),调用方不会报错。如果收到 GeneratorExit 异常,生成器一定不能产出值,否则解释器会抛出RuntimeError 异常。生成器抛出的其他异常会向上冒泡,传给调用方。
class DemoException(Exception):
"""
custom exception
"""
pass
def handle_exception():
print('-> start')
while True:
try:
x = yield
except DemoException:
print('-> run demo exception')
else:
print('-> received x:', x)
raise RuntimeError('this line should never run')
he = handle_exception()
next(he)
he.send(10) # received x: 10
he.send(20) # received x: 20
he.throw(DemoException) # run demo exception
he.send(40) # received x: 40
he.close()
- yield from
- 为了得到返回值,协程必须正常终止;
- 然后生成器对象会抛出StopIteration 异常,异常对象的 value 属性保存着返回的值。
- yield from 从内部捕获StopIteration异常
- 并且把StopIteration异常value属性作为yield from表达式的返回值
class DemoException(Exception):
"""
custom exception
"""
pass
def handle_exception():
print('-> start')
while True:
try:
x = yield
except DemoException:
print('-> run demo exception')
else:
print('-> received x:', x)
he = handle_exception()
next(he)
he.send(10) # received x: 10
he.send(20) # received x: 20
try:
he.send(40) # received x: 40
he.close()
he.send(50) # received x: 40
he.close()
except Exception as e:
print(str(e))
print(e.value)
- 另一个例子:
def gen():
for c in 'AB':
yield c
print(list(gen()))
def gen_new():
yield from 'AB'
print(list(gen_new()))
- 委派生成器
- 包含 yield from 表达式的生成器函数
- 委派生成器在 yield from 表达式处暂停时,调用方可以直接把数据发给子生成器。
- 子生成器再把产出的值发给调用方。
- 子生成器返回之后,解释器会抛出 StopIteration 异常,并把返回值附加到异常对象上,此时委派生成器会恢复。
from collections import namedtuple
'''
解释:
1. 外层 for 循环每次迭代会新建一个 grouper 实例,赋值给 coroutine 变量; grouper 是委派生成器。
2. 调用 next(coroutine),预激委派生成器 grouper,此时进入 while True 循环,调用子生成器 averager 后,在 yield from 表达式处暂停。
3. 内层 for 循环调用 coroutine.send(value),直接把值传给子生成器 averager。同时,当前的 grouper 实例(coroutine)在 yield from 表达式处暂停。
4. 内层循环结束后, grouper 实例依旧在 yield from 表达式处暂停,因此, grouper函数定义体中为 results[key] 赋值的语句还没有执行。
5. coroutine.send(None) 终止 averager 子生成器,子生成器抛出 StopIteration 异常并将返回的数据包含在异常对象的value中,yield from 可以直接抓取 StopIteration 异常并将异常对象的 value 赋值给 results[key]
'''
ResClass = namedtuple('Res', 'count average')
# 子生成器
def averager():
total = 0.0
count = 0
average = None
while True:
term = yield
if term is None:
break
total += term
count += 1
average = total / count
return ResClass(count, average)
# 委派生成器
def grouper(storages, key):
while True:
# 获取averager()返回的值
storages[key] = yield from averager()
# 客户端代码
def client():
process_data = {
'boys_2': [39.0, 40.8, 43.2, 40.8, 43.1, 38.6, 41.4, 40.6, 36.3],
'boys_1': [1.38, 1.5, 1.32, 1.25, 1.37, 1.48, 1.25, 1.49, 1.46]
}
storages = {}
for k, v in process_data.items():
# 获得协程
coroutine = grouper(storages, k)
# 预激协程
next(coroutine)
# 发送数据到协程
for dt in v:
coroutine.send(dt)
# 终止协程
coroutine.send(None)
print(storages)
# run
client()
asyncio
- python3.4开始引入的标准库,内置了对异步io的支持
- asyncio本身是一个消息循环
- 步骤
- 创建消息循环
- 把协程导入
- 关闭
- 案例08
import asyncio
from aiohttp import web
async def index(request):
await asyncio.sleep(0.5)
return web.Response(body=b'<h1>Index</h1>')
async def hello(request):
await asyncio.sleep(0.5)
text = '<h1>hello, %s!</h1>' % request.match_info['name']
return web.Response(body=text.encode('utf-8'))
async def init(loop):
app = web.Application(loop=loop)
app.router.add_route('GET', '/', index)
app.router.add_route('GET', '/hello/{name}', hello)
srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)
print('Server started at http://127.0.0.1:8000...')
return srv
loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
loop.run_forever()
- 案例09中有两个tasks
import threading
import asyncio
@asyncio.coroutine
def hello():
print('Hello world! (%s)' % threading.currentThread())
print('Start..... (%s)' % threading.currentThread())
yield from asyncio.sleep(10)
print('Done..... (%s)' % threading.currentThread())
print('Hello again! (%s)' % threading.currentThread())
loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
- 案例v10,得到多个网站
import asyncio
@asyncio.coroutine
def wget(host):
print('wget %s...' % host)
connect = asyncio.open_connection(host, 80)
reader, writer = yield from connect
header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host
writer.write(header.encode('utf-8'))
yield from writer.drain()
while True:
line = yield from reader.readline()
if line == b'\r\n':
break
print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
# Ignore the body, close the socket
writer.close()
loop = asyncio.get_event_loop()
tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
async and await
- 为了更好的表示异步io
- python3.5 开始引入
- 让coroutine代码更简洁
- 使用上,可以简单进行替换
- 可以把 @asyncio.coroutine 替换成async
- yield from 替换成await
- 案例v11是把案例09直接替换
import threading
import asyncio
# @asyncio.coroutine
async def hello():
print('Hello world! (%s)' % threading.currentThread())
print('Start..... (%s)' % threading.currentThread())
await asyncio.sleep(10)
print('Done..... (%s)' % threading.currentThread())
print('Hello again! (%s)' % threading.currentThread())
loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
aiohttp
- 介绍
- asyncio实现单线程并发IO,在客户端用处不大
- 在服务器端可以asyncio+coroutine配合,因为http是io操作
- asyncio实现了TCP, UIDP, SSL等协议
- aiohttp是基于asyncio实现的HTTP框架
- pip install aiohttp
- 下面案例跟上面一致,是使用aiohttp的例子
import asyncio
from aiohttp import web
async def index(request):
await asyncio.sleep(0.5)
return web.Response(body=b'<h1>Index</h1>')
async def hello(request):
await asyncio.sleep(0.5)
text = '<h1>hello, %s!</h1>' % request.match_info['name']
return web.Response(body=text.encode('utf-8'))
async def init(loop):
app = web.Application(loop=loop)
app.router.add_route('GET', '/', index)
app.router.add_route('GET', '/hello/{name}', hello)
srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)
print('Server started at http://127.0.0.1:8000...')
return srv
loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
loop.run_forever()
concurrent.futures
- python3新增的库
- 类似其他语言的线程池的概念
- 此模块利用multiprocessing实现真正的平行计算
- 核心原理是:concurrent.futures会以子进程的形式,平行的运行多个python解释器,从而令python程序可以利用多核CPU来提升执行速度。由于子进程与主解释器相分离,所以他们的全局解释器锁也是相互独立的。每个子进程都能够完整的使用一个CPU内核。
- concurrent.futures.Executor
- ThreadPoolExecutor
- ProcessPoolExecutor
- submit(fn, args, kwargs)
- fn:异步执行的函数
- args, kwargs:参数
# 官方死锁案例
import time
def wait_on_b():
time.sleep(5)
print(b.result()) # b不会完成,他一直在等待a的return结果
return 5
def wait_on_a():
time.sleep(5)
print(a.result()) # 同理a也不会完成,他也是在等待b的结果
return 6
executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)
- 案例v14是上面的一个展示
from concurrent.futures import ThreadPoolExecutor
import time
def return_future(msg):
time.sleep(3)
return msg
# 创建一个线程池
pool = ThreadPoolExecutor(max_workers=2)
# 往线程池加入2个task
f1 = pool.submit(return_future, 'hello')
f2 = pool.submit(return_future, 'world')
print(f1.done())
time.sleep(3)
print(f2.done())
print(f1.result())
print(f2.result())
- map(fn, *iterables, timeout=None)
- 跟map函数类似
- 函数需要异步执行
- timeout:超时时间
- 案例v12
import time, re
import os, datetime
from concurrent import futures
data = ['1', '2']
def wait_on(argument):
print(argument)
time.sleep(2)
return "ok"
ex = futures.ThreadPoolExecutor(max_workers=2)
for i in ex.map(wait_on, data):
print(i)
- 案例v15
from concurrent.futures import ThreadPoolExecutor as Pool
# import requests
import urllib
from urllib import request
URLS = ['http://www.baidu.com', 'http://qq.com', 'http://sina.com']
def task(url, timeout=20):
# return requests.get(url, timeout=timeout)
return request.urlopen(url, timeout=timeout)
pool = Pool(max_workers=3)
results = pool.map(task, URLS)
import time
time.sleep(20)
for ret in results:
print('%s, %s' % (ret.url, len(ret.read())))
- 执行结果是list,数据需要从list中取出来
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
print(list(executor.map(sleeper, x)))
- submit和map根据需要选一个即可
import time, re, fcntl
import os, datetime
from concurrent import futures
count_list = list()
MinuteNum = 1
StartTime = datetime.datetime(2018, 5, 1, 19, 31, 0, 484870)
NowTime = datetime.datetime.now()
os.system(':>new.txt')
f_new = open('new.txt', 'a')
def conc(CountTimeFormat):
f = open('push_slave.stdout', 'r')
for line in f.readlines():
if re.search(CountTimeFormat, line):
# 获得文件专用锁
fcntl.flock(f_new, fcntl.LOCK_EX)
f_new.writelines(line)
f_new.flush()
# 释放文件锁
fcntl.flock(f_new, fcntl.LOCK_UN)
break
while 1:
AfterOneMinute = datetime.timedelta(minutes=MinuteNum)
CountTime = AfterOneMinute + StartTime
CountTimeFormat = CountTime.strftime('%Y-%m-%d %H:%M')
MinuteNum = MinuteNum + 1
count_list.append(CountTimeFormat)
if CountTimeFormat == "2018-05-2 16:00":
break
def exec_cmd():
with futures.ProcessPoolExecutor(max_workers=24) as executor:
dict((executor.submit(conc, times), times) for times in count_list)
if __name__ == '__main__':
exec_cmd()
f_new.close()
- Future
- 未来需要完成的任务
- future 实例由Excutor.submit创建
- 案例v17
from concurrent.futures import ThreadPoolExecutor as Pool
from concurrent.futures import as_completed
import requests
URLS = ['http://qq.com', 'http://sina.com', 'http://www.baidu.com']
def task(url, timeout=10):
return requests.get(url, timeout=timeout)
with Pool(max_workers=3) as executor:
future_tasks = [executor.submit(task, url) for url in URLS]
for f in future_tasks:
if f.running():
print('%s is running' % str(f))
for f in as_completed(future_tasks):
try:
ret = f.done()
if ret:
f_ret = f.result()
print('%s, done, result: %s, %s' % (str(f), f_ret.url, len(f_ret.content)))
except Exception as e:
f.cancel()
print(str(e))