重要概念
1、C10K问题:(参考博客C10K问题)
2、并发、并行、同步、异步、阻塞、非阻塞。
1.并发是指在一个时间段内,有几个程序在同一个cpu上运行,但是任意时刻只有一个程序在cpu上运行。
2.并行是指任意时刻点上,有多个程序同时运行在多个cpu上。
3、IO多路复用——select、pool、epool。
select+回调+事件循环获取html
非阻塞io完成http请求:实际上并没有提升并发效率
import time
import socket
from urllib.parse import urlparse
# 使用非阻塞io完成http请求
def get_url(url):
# 通过socket请求html
url = urlparse(url)
host = url.netloc
path = url.path
if path == "":
path = "/"
# 建立socket连接
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 非阻塞io
client.setblocking(False)
try:
client.connect((host, 80)) # 阻塞不会消耗cpu
except BlockingIOError as e:
pass
# 不停的询问连接是否建立好, 需要while循环不停的去检查状态
# 做计算任务或者再次发起其他的连接请求
while True:
try:
client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))
break
except OSError as e:
pass
data = b""
while True:
try:
d = client.recv(1024)
except BlockingIOError as e:
continue
if d:
data += d
else:
break
data = data.decode("utf8")
html_data = data.split("\r\n\r\n")[1]
print(html_data)
client.close()
if __name__ == "__main__":
get_url("http://www.baidu.com")
select+回调+事件循环完成http请求——并发性好、使用的是单线程
1、epoll并不代表一定比select好:
在并发高的情况下,连接活跃度不是很高, epoll比select好。
并发性不高,同时连接很活跃, select比epoll好。
2、回调+事件循环+select(poll\epoll)的模式:
tornado
gevent
协程/asyncio
3、具体实现:
import socket
from urllib.parse import urlparse
# selectors内部使用select,实际使用这个
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
# 实例化一个全局的selector
selector = DefaultSelector()
urls = []
stop = False
# 使用类完成 —— 回调需要用到已经建立好的链接
class Fetcher:
def connected(self, key):
selector.unregister(key.fd)
# 这里使用了事件监听,不用使用while循环轮训(不需要用try捕获异常)
# path与host都需要设置成对象的属性
self.client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(self.path, self.host).encode("utf8"))
# 接收数据又需要监听socket——是否是可读的状态
selector.register(self.client.fileno(), EVENT_READ, self.readable)
# 变得可读的时候我们需要自己去调用它的代码
def readable(self, key):
d = self.client.recv(1024)
if d:
self.data += d
# 数据已经读完了
else:
selector.unregister(key.fd)
data = self.data.decode("utf8")
html_data = data.split("\r\n\r\n")[1]
print(html_data)
self.client.close()
# windows默认使用select会抛异常,linux默认使用epoll不会出现问题
# 下面的代码是在windows中的必要设置
urls.remove(self.spider_url)
if not urls:
# 必须修改全局的变量
global stop
stop = True
def get_url(self, url):
self.spider_url = url
url = urlparse(url)
self.host = url.netloc
self.path = url.path
self.data = b""
if self.path == "":
self.path = "/"
# 建立socket连接
self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.client.setblocking(False)
try:
self.client.connect((self.host, 80)) # 阻塞不会消耗cpu
except BlockingIOError as e:
pass
# 注意必须注册!
# 回调使用已经建立好的链接 —— 注意connected不要加括号!写函数的名称!
selector.register(self.client.fileno(), EVENT_WRITE, self.connected)
# 自己调用select,去不停的判断哪一个socket准备好了、它是可读还是可写
# 如果它是可读或者可写我们需要调用对应的回调函数 —— 注意这个回调是我们自己来做的
def loop():
# 事件循环,不停的请求socket的状态并调用对应的回调函数
# 1. select本身是不支持register模式
# 2. socket状态变化以后的回调是由程序员自己完成的,而不是操作系统完成的
while not stop:
ready = selector.select()
for key, mask in ready:
call_back = key.data
call_back(key)
if __name__ == "__main__":
fetcher = Fetcher()
start_time = time.time()
for url in range(20):
url = "http://shop.projectsedu.com/goods/{}/".format(url)
urls.append(url)
fetcher = Fetcher()
fetcher.get_url(url)
loop()
print(time.time()-start_time)
“”“
0.26441121101379395
”“”
4、同步的方式耗时比上面的长:
# requests -> urlib -> socket
import socket
from urllib.parse import urlparse
def get_url(url):
# 通过socket请求html
url = urlparse(url)
host = url.netloc
path = url.path
if path == "":
path = "/"
# 建立socket连接
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# client.setblocking(False)
client.connect((host, 80)) #阻塞不会消耗cpu
# 不停的询问连接是否建立好, 需要while循环不停的去检查状态
# 做计算任务或者再次发起其他的连接请求
client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))
data = b""
while True:
d = client.recv(1024)
if d:
data += d
else:
break
data = data.decode("utf8")
html_data = data.split("\r\n\r\n")[1]
print(html_data)
client.close()
if __name__ == "__main__":
import time
start_time = time.time()
for url in range(20):
url = "http://shop.projectsedu.com/goods/{}/".format(url)
get_url(url)
print(time.time()-start_time)
"""
0.8437938690185547
"""
回调模式的问题
1、我们去进行下一步操作的时候必须让事件循环驱动回调。
2、将传统的代码分割的“四分五裂”,代码维护难度高。
3、会产生的问题:
1.如果回调函数执行不正常该如何处理?
2.如果回调里面还要嵌套回调怎么办?要嵌套很多层怎么办?
3.如果嵌套了多层,其中某个环节出错了会造成什么后果?
4.如果有个数据需要被每个回调都处理怎么办?
5.如何使用当前函数中的局部变量?
协程介绍
1、C10M问题:如何利用8核CPU、64G内存,在10gbps的网络上保持1000万并发连接。
2、回调模式与同步编程的问题:
1、回调模式代码复杂度高
2、同步编程并发性不高
3、多线程编程需要线程间同步 —— 需要用到锁 —— 但是锁会降低效率
3、面临的挑战:
1、采用同步的方式去编写异步的代码
2、使用单线程去切换任务:
->1.线程是由操作系统切换的,单线程切换意味着我们需要程序员自己去调度任务
->2.不再需要锁,并发性高,在单线程之间切换函数,性能远高于线程间切换!
4、传统函数的调用过程
def get_html(url):
pass
def parse_url(url):
pass
def get_url1(url):
#do someting 1
html = get_html(url) #此处暂停,切换到另一个函数去执行
# #parse html
urls = parse_url(html)
def get_url2(url):
# do someting 1
html = get_html(url) #此处暂停,切换到另一个函数去执行
# #parse html
urls = parse_url(html)
# 传统函数调用 过程 A->B->C
5、新的需求及协程的出现:
1.我们需要一个可以暂停的函数,并且可以在适当的时候恢复该函数的继续执行
2.出现了协程 -> 有多个入口的函数, 可以暂停的函数, 可以暂停的函数(可以向暂停的地方传入值)
生成器进阶及send/close/throw方法
1、send方法
生成器不只可以产出值,还可以接收值。
send方法可以传递值进入生成器内部,同时还可以重启生成器执行到下一个yield位置。
有了send方法将值传递给生成器内部,才可以使用生成器实现协程!
在调用send发送非none值之前,我们必须启动一次生成器!启动生成器方式有两种:gen.send(None)
与next(gen)
。
1. gen.send(None)
2. next(gen)
send的案例:
def gen_func():
# 下面这行代码的作用有2个:
# 1. 可以产出值, 2. 可以接收值(调用方传递进来的值) —— 传递给生成器内部
html = yield "http://projectsedu.com"
print("html>>>>>",html)
yield 111
yield 222
return "whw"
if __name__ == "__main__":
gen = gen_func()
# 在调用send发送非none值之前,我们必须启动一次生成器
# 启动生成器方式有两种,gen.send(None) 与 next(gen)
# 1. gen.send(None); 2. next(gen)
# 在第一次send时,必须send一个None!!!~~~~~~~~
url = gen.send(None)
# download url
html = "whw"
# send方法可以传递值进入生成器内部,同时还可以重启生成器执行到下一个yield位置
print(gen.send(html))
print(gen.send(html))
"""
html>>>>> whw
111
222
"""
2、close方法
停止——如果后面还使用next方法的话会报StopIteration
异常。
close方法之后还有yield的话会抛异常:GeneratorExit
。
def gen_func():
# 1. 可以产出值, 2. 可以接收值(调用方传递进来的值)
try:
yield "http://projectsedu.com"
except Exception:
import traceback
print(traceback.format_exc())
yield 2
yield 3
return "whw"
if __name__ == "__main__":
gen = gen_func()
print(next(gen))
# close
gen.close()
print(next(gen))
print("whw") # 不会执行
# GeneratorExit是继承自BaseException, Exception
3、throw方法
与异常处理有关。
def gen_func():
#1. 可以产出值, 2. 可以接收值(调用方传递进来的值)
try:
yield "http://projectsedu.com"
except Exception as e:
import traceback
print(traceback.format_exc())
yield 2
yield 3
return "whw"
if __name__ == "__main__":
gen = gen_func()
print(next(gen))
# 是前一个yield对应的异常!
gen.throw(Exception, "download error")
print(next(gen))
gen.throw(Exception, "download error")
yield from语法
yield from是python3.3新加的语法。
itertools.chain
将几个可迭代对象放在一起遍历。
from itertools import chain
my_list = [1,2,3]
my_dict = {
"html1":"http://projectsedu.com",
"html2":"http://www.imooc.com",
}
for value in chain(my_list,my_dict,range(1,5)):
print(value)
"""
1
2
3
html1
html2
1
2
3
4
"""
使用yield实现itertools.chain的功能
my_list = [1,2,3]
my_dict = {
"html1":"http://projectsedu.com",
"html2":"http://www.imooc.com",
}
# 自己是心啊一个chain效果的函数
def my_chain(*args, **kwargs):
for my_iterable in args:
# yield from my_iterable
for value in my_iterable:
yield value
for value in my_chain(my_list, my_dict, range(5,7)):
print(value)
"""
1
2
3
html1
html2
5
6
"""
使用yield from实现itertools.chain的功能
my_list = [1,2,3]
my_dict = {
"html1":"http://projectsedu.com",
"html2":"http://www.imooc.com",
}
def my_chain(*args, **kwargs):
for my_iterable in args:
yield from my_iterable
for value in my_chain(my_list, my_dict, range(5,7)):
print(value)
"""
1
2
3
html1
html2
5
6
"""
yield语法与yield from语法的区别
yield后面跟什么就返回什么;yield from后面跟一个可迭代对象,会将这个可迭代对象中的值返回去。
def g1(iterable):
# yield后面跟什么就返回什么
yield iterable
def g2(iterable):
# yield from后面跟一个可迭代对象,会将这个可迭代对象中的值返回去。
yield from iterable
for v in g1(range(3)):
print(v)
print("==================")
for v in g2(range(3)):
print(v)
"""
range(0, 3)
==================
0
1
2
"""
yield from会在调用方与子生成器之间建立一个双向通道
"""
# 1. main:调用方;g1:委托生成器;gen:子生成器
# 2. yield from会在调用方与子生成器之间建立一个双向通道
# 此时子生成器gen与主调用方main就建立起了联系了
"""
# gen:子生成器
def g1(gen):
yield from gen
# g1:委托生成器
def main():
g = g1()
g.send(None)
调用方+委托生成器+子生成器的使用:构建销量字典
final_result = {}
# 子生成器 —— 接收外面传来的值并做保存
def sales_sum(pro_name):
total = 0
nums = []
while True:
x = yield
print(pro_name+"销量: ", x)
# 外面传进来的值为None结束
if not x:
break
total += x
nums.append(x)
return total, nums
# 委托生成器 —— 将每一行数据进行统计
def middle(key):
while True:
# yield from调用子生成器
final_result[key] = yield from sales_sum(key)
print(key+"销量统计完成!!.")
# 调用方
def main():
data_sets = {
"面膜": [1200,1500],
"飞机": [28,55,98,108],
"杯子": [280,560,778,70],
}
for key, data_set in data_sets.items():
print("start key:", key)
m = middle(key)
# 预激middle协程
m.send(None)
for value in data_set:
m.send(value) # 给协程传递每一组的值
m.send(None)
print("final_result:", final_result)
if __name__ == '__main__':
main()
"""
start key: 面膜
面膜销量: 1200
面膜销量: 1500
面膜销量: None
面膜销量统计完成!!.
start key: 飞机
飞机销量: 28
飞机销量: 55
飞机销量: 98
飞机销量: 108
飞机销量: None
飞机销量统计完成!!.
start key: 杯子
杯子销量: 280
杯子销量: 560
杯子销量: 778
杯子销量: 70
杯子销量: None
杯子销量统计完成!!.
final_result: {'面膜': (2700, [1200, 1500]), '飞机': (289, [28, 55, 98, 108]), '杯子': (1688, [280, 560, 778, 70])}
"""
子生成器分析:yield from的强大之处
单独使用子生成器的话还得捕获异常!在委托生成器中使用yield from的话不用考虑这一点。
def sales_sum(pro_name):
total = 0
nums = []
while True:
x = yield
print(pro_name+"销量: ", x)
if not x:
break
total += x
nums.append(x)
return total, nums
if __name__ == "__main__":
my_gen = sales_sum("飞机")
# send的第一个值必须是None!!!
my_gen.send(None)
my_gen.send(1200)
my_gen.send(1500)
my_gen.send(3000)
try:
my_gen.send(None)
except StopIteration as e:
result = e.value
print(result)
"""
飞机销量: 1200
飞机销量: 1500
飞机销量: 3000
飞机销量: None
(5700, [1200, 1500, 3000])
"""
yield from的原理说明(pep380文档)
# pep380
#1. RESULT = yield from EXPR可以简化成下面这样
#一些说明
"""
_i:子生成器,同时也是一个迭代器
_y:子生成器生产的值
_r:yield from 表达式最终的值
_s:调用方通过send()发送的值
_e:异常对象
"""
_i = iter(EXPR) # EXPR是一个可迭代对象,_i其实是子生成器;
try:
_y = next(_i) # 预激子生成器,把产出的第一个值存在_y中;
except StopIteration as _e:
_r = _e.value # 如果抛出了`StopIteration`异常,那么就将异常对象的`value`属性保存到_r,这是最简单的情况的返回值;
else:
while 1: # 尝试执行这个循环,委托生成器会阻塞;
_s = yield _y # 生产子生成器的值,等待调用方`send()`值,发送过来的值将保存在_s中;
try:
_y = _i.send(_s) # 转发_s,并且尝试向下执行;
except StopIteration as _e:
_r = _e.value # 如果子生成器抛出异常,那么就获取异常对象的`value`属性存到_r,退出循环,恢复委托生成器的运行;
break
RESULT = _r # _r就是整个yield from表达式返回的值。
"""
1. 子生成器可能只是一个迭代器,并不是一个作为协程的生成器,所以它不支持.throw()和.close()方法;
2. 如果子生成器支持.throw()和.close()方法,但是在子生成器内部,这两个方法都会抛出异常;
3. 调用方让子生成器自己抛出异常
4. 当调用方使用next()或者.send(None)时,都要在子生成器上调用next()函数,当调用方使用.send()发送非 None 值时,才调用子生成器的.send()方法;
"""
_i = iter(EXPR)
try:
_y = next(_i)
except StopIteration as _e:
_r = _e.value
else:
while 1:
try:
_s = yield _y
except GeneratorExit as _e:
try:
_m = _i.close
except AttributeError:
pass
else:
_m()
raise _e
except BaseException as _e:
_x = sys.exc_info()
try:
_m = _i.throw
except AttributeError:
raise _e
else:
try:
_y = _m(*_x)
except StopIteration as _e:
_r = _e.value
break
else:
try:
if _s is None:
_y = next(_i)
else:
_y = _i.send(_s)
except StopIteration as _e:
_r = _e.value
break
RESULT = _r
"""
看完代码,我们总结一下关键点:
1. 子生成器生产的值,都是直接传给调用方的;调用方通过.send()发送的值都是直接传递给子生成器的;如果发送的是 None,会调用子生成器的__next__()方法,如果不是 None,会调用子生成器的.send()方法;
2. 子生成器退出的时候,最后的return EXPR,会触发一个StopIteration(EXPR)异常;
3. yield from表达式的值,是子生成器终止时,传递给StopIteration异常的第一个参数;
4. 如果调用的时候出现StopIteration异常,委托生成器会恢复运行,同时其他的异常会向上 "冒泡";
5. 传入委托生成器的异常里,除了GeneratorExit之外,其他的所有异常全部传递给子生成器的.throw()方法;如果调用.throw()的时候出现了StopIteration异常,那么就恢复委托生成器的运行,其他的异常全部向上 "冒泡";
6. 如果在委托生成器上调用.close()或传入GeneratorExit异常,会调用子生成器的.close()方法,没有的话就不调用。如果在调用.close()的时候抛出了异常,那么就向上 "冒泡",否则的话委托生成器会抛出GeneratorExit异常。
"""
Python中的原生协程:async与await
python3.5为了将语义变得更加明确,就引入了async和await关键词用于定义原生的协程。
注意:async中不能有yield。
""" types.coroutine装饰的函数可以放在await后面 """
# import types
# @types.coroutine
# def downloader(url):
# yield "whw"
from collections import Awaitable
async def downloader(url):
return "whw"
async def download_url(url):
# dosomethings
# await后面只能接收Awaitable对象
html = await downloader(url)
return html
if __name__ == "__main__":
coro = download_url("http://www.imooc.com")
# 原生协程不能使用next调用!须使用send调用!
# next(None)
coro.send(None)
生成器实现协程的一个例子
# -*- coding:utf-8 -*-
import time
def consumer():
# consumer作为一个生成器
while 1:
data = yield
def producer():
# 生成器对象
g = consumer()
# 先next后面才能send具体的非None的值,相当于先send一个None
next(g)
for i in range(1000000):
g.send(i)
if __name__ == '__main__':
start = time.time()
#并发执行,但是任务producer遇到io就会阻塞住,并不会切到该线程内的其他任务去执行
producer()
print('执行时间:',time.time() - start)