重要概念

1、C10K问题:(参考博客C10K问题)

2、并发、并行、同步、异步、阻塞、非阻塞。

1.并发是指在一个时间段内,有几个程序在同一个cpu上运行,但是任意时刻只有一个程序在cpu上运行。
2.并行是指任意时刻点上,有多个程序同时运行在多个cpu上。

3、IO多路复用——select、pool、epool。

select+回调+事件循环获取html

非阻塞io完成http请求:实际上并没有提升并发效率

import time
import socket
from urllib.parse import urlparse


# 使用非阻塞io完成http请求

def get_url(url):
    # 通过socket请求html
    url = urlparse(url)
    host = url.netloc
    path = url.path
    if path == "":
        path = "/"

    # 建立socket连接
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # 非阻塞io
    client.setblocking(False)
    try:
        client.connect((host, 80)) # 阻塞不会消耗cpu
    except BlockingIOError as e:
        pass

    # 不停的询问连接是否建立好, 需要while循环不停的去检查状态
    # 做计算任务或者再次发起其他的连接请求
    while True:
        try:
            client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))
            break
        except OSError as e:
            pass


    data = b""
    while True:
        try:
            d = client.recv(1024)
        except BlockingIOError as e:
            continue
        if d:
            data += d
        else:
            break

    data = data.decode("utf8")
    html_data = data.split("\r\n\r\n")[1]
    print(html_data)
    client.close()

if __name__ == "__main__":
    get_url("http://www.baidu.com")

select+回调+事件循环完成http请求——并发性好、使用的是单线程

1、epoll并不代表一定比select好:

在并发高的情况下,连接活跃度不是很高, epoll比select好。
并发性不高,同时连接很活跃, select比epoll好。

2、回调+事件循环+select(poll\epoll)的模式:

tornado
gevent
协程/asyncio

3、具体实现:

import socket
from urllib.parse import urlparse
# selectors内部使用select,实际使用这个
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE


# 实例化一个全局的selector
selector = DefaultSelector()
urls = []
stop = False


# 使用类完成 —— 回调需要用到已经建立好的链接
class Fetcher:

    def connected(self, key):
        selector.unregister(key.fd)
        # 这里使用了事件监听,不用使用while循环轮训(不需要用try捕获异常)
        # path与host都需要设置成对象的属性
        self.client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(self.path, self.host).encode("utf8"))
        # 接收数据又需要监听socket——是否是可读的状态
        selector.register(self.client.fileno(), EVENT_READ, self.readable)

    # 变得可读的时候我们需要自己去调用它的代码
    def readable(self, key):
        d = self.client.recv(1024)
        if d:
            self.data += d
        # 数据已经读完了
        else:
            selector.unregister(key.fd)
            data = self.data.decode("utf8")
            html_data = data.split("\r\n\r\n")[1]
            print(html_data)
            self.client.close()
            # windows默认使用select会抛异常,linux默认使用epoll不会出现问题
            # 下面的代码是在windows中的必要设置
            urls.remove(self.spider_url)
            if not urls:
                # 必须修改全局的变量
                global stop
                stop = True

    def get_url(self, url):
        self.spider_url = url
        url = urlparse(url)
        self.host = url.netloc
        self.path = url.path
        self.data = b""
        if self.path == "":
            self.path = "/"

        # 建立socket连接
        self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.client.setblocking(False)

        try:
            self.client.connect((self.host, 80))  # 阻塞不会消耗cpu
        except BlockingIOError as e:
            pass

        # 注意必须注册!
        # 回调使用已经建立好的链接  —— 注意connected不要加括号!写函数的名称!
        selector.register(self.client.fileno(), EVENT_WRITE, self.connected)

# 自己调用select,去不停的判断哪一个socket准备好了、它是可读还是可写
# 如果它是可读或者可写我们需要调用对应的回调函数 —— 注意这个回调是我们自己来做的
def loop():
    # 事件循环,不停的请求socket的状态并调用对应的回调函数
    # 1. select本身是不支持register模式
    # 2. socket状态变化以后的回调是由程序员自己完成的,而不是操作系统完成的
    while not stop:
        ready = selector.select()
        for key, mask in ready:
            call_back = key.data
            call_back(key)

if __name__ == "__main__":
    fetcher = Fetcher()
    start_time = time.time()
    for url in range(20):
        url = "http://shop.projectsedu.com/goods/{}/".format(url)
        urls.append(url)
        fetcher = Fetcher()
        fetcher.get_url(url)
    loop()
    print(time.time()-start_time)
    “”“
    0.26441121101379395
    ”“”

4、同步的方式耗时比上面的长:

# requests -> urlib -> socket
import socket
from urllib.parse import urlparse


def get_url(url):
    # 通过socket请求html
    url = urlparse(url)
    host = url.netloc
    path = url.path
    if path == "":
        path = "/"

    # 建立socket连接
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # client.setblocking(False)
    client.connect((host, 80)) #阻塞不会消耗cpu
    # 不停的询问连接是否建立好, 需要while循环不停的去检查状态
    # 做计算任务或者再次发起其他的连接请求
    client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))

    data = b""
    while True:
        d = client.recv(1024)
        if d:
            data += d
        else:
            break

    data = data.decode("utf8")
    html_data = data.split("\r\n\r\n")[1]
    print(html_data)
    client.close()

if __name__ == "__main__":
    import time
    start_time = time.time()
    for url in range(20):
        url = "http://shop.projectsedu.com/goods/{}/".format(url)
        get_url(url)
    print(time.time()-start_time)
    """
    0.8437938690185547
    """

回调模式的问题

1、我们去进行下一步操作的时候必须让事件循环驱动回调。

2、将传统的代码分割的“四分五裂”,代码维护难度高。

3、会产生的问题:

1.如果回调函数执行不正常该如何处理?
2.如果回调里面还要嵌套回调怎么办?要嵌套很多层怎么办?
3.如果嵌套了多层,其中某个环节出错了会造成什么后果?
4.如果有个数据需要被每个回调都处理怎么办?
5.如何使用当前函数中的局部变量?

协程介绍

1、C10M问题:如何利用8核CPU、64G内存,在10gbps的网络上保持1000万并发连接。

2、回调模式与同步编程的问题:

1、回调模式代码复杂度高
2、同步编程并发性不高
3、多线程编程需要线程间同步 —— 需要用到锁 —— 但是锁会降低效率

3、面临的挑战:

1、采用同步的方式去编写异步的代码
2、使用单线程去切换任务:
->1.线程是由操作系统切换的,单线程切换意味着我们需要程序员自己去调度任务
->2.不再需要锁,并发性高,在单线程之间切换函数,性能远高于线程间切换!

4、传统函数的调用过程

def get_html(url):
    pass

def parse_url(url):
    pass

def get_url1(url):
    #do someting 1
    html = get_html(url) #此处暂停,切换到另一个函数去执行
    # #parse html
    urls = parse_url(html)

def get_url2(url):
    # do someting 1
    html = get_html(url) #此处暂停,切换到另一个函数去执行
    # #parse html
    urls = parse_url(html)

# 传统函数调用 过程 A->B->C

5、新的需求及协程的出现:

1.我们需要一个可以暂停的函数,并且可以在适当的时候恢复该函数的继续执行

2.出现了协程 -> 有多个入口的函数, 可以暂停的函数, 可以暂停的函数(可以向暂停的地方传入值)

生成器进阶及send/close/throw方法

1、send方法

生成器不只可以产出值,还可以接收值。

send方法可以传递值进入生成器内部,同时还可以重启生成器执行到下一个yield位置。

有了send方法将值传递给生成器内部,才可以使用生成器实现协程!

在调用send发送非none值之前,我们必须启动一次生成器!启动生成器方式有两种:gen.send(None)next(gen)

1. gen.send(None)
2. next(gen)

send的案例:

def gen_func():
    # 下面这行代码的作用有2个:
    # 1. 可以产出值, 2. 可以接收值(调用方传递进来的值) —— 传递给生成器内部
    html = yield "http://projectsedu.com"
    print("html>>>>>",html)
    yield 111
    yield 222
    return "whw"


if __name__ == "__main__":
    gen = gen_func()
    # 在调用send发送非none值之前,我们必须启动一次生成器
    # 启动生成器方式有两种,gen.send(None) 与 next(gen)
    # 1. gen.send(None); 2. next(gen)
    # 在第一次send时,必须send一个None!!!~~~~~~~~
    url = gen.send(None)
    # download url
    html = "whw"
    # send方法可以传递值进入生成器内部,同时还可以重启生成器执行到下一个yield位置
    print(gen.send(html))
    print(gen.send(html))
    """
    html>>>>> whw
    111
    222
    """

2、close方法

停止——如果后面还使用next方法的话会报StopIteration异常。

close方法之后还有yield的话会抛异常:GeneratorExit

def gen_func():
    # 1. 可以产出值, 2. 可以接收值(调用方传递进来的值)
    try:
        yield "http://projectsedu.com"
    except Exception:
        import traceback
        print(traceback.format_exc())
    yield 2
    yield 3
    return "whw"

if __name__ == "__main__":
    gen = gen_func()
    print(next(gen))
    # close
    gen.close()
    print(next(gen))
    print("whw") # 不会执行

    # GeneratorExit是继承自BaseException, Exception

3、throw方法

与异常处理有关。

def gen_func():
    #1. 可以产出值, 2. 可以接收值(调用方传递进来的值)
    try:
        yield "http://projectsedu.com"
    except Exception as e:
        import traceback
        print(traceback.format_exc())
    yield 2
    yield 3
    return "whw"

if __name__ == "__main__":
    gen = gen_func()
    print(next(gen))
    # 是前一个yield对应的异常!
    gen.throw(Exception, "download error")
    print(next(gen))
    gen.throw(Exception, "download error")

yield from语法

yield from是python3.3新加的语法。

itertools.chain

将几个可迭代对象放在一起遍历。

from itertools import chain

my_list = [1,2,3]
my_dict = {
    "html1":"http://projectsedu.com",
    "html2":"http://www.imooc.com",
}

for value in chain(my_list,my_dict,range(1,5)):
    print(value)
"""
1
2
3
html1
html2
1
2
3
4
"""

使用yield实现itertools.chain的功能

my_list = [1,2,3]
my_dict = {
    "html1":"http://projectsedu.com",
    "html2":"http://www.imooc.com",
}

# 自己是心啊一个chain效果的函数
def my_chain(*args, **kwargs):
    for my_iterable in args:
        # yield from my_iterable
        for value in my_iterable:
            yield value

for value in my_chain(my_list, my_dict, range(5,7)):
    print(value)
"""
1
2
3
html1
html2
5
6
"""

使用yield from实现itertools.chain的功能

my_list = [1,2,3]
my_dict = {
    "html1":"http://projectsedu.com",
    "html2":"http://www.imooc.com",
}

def my_chain(*args, **kwargs):
    for my_iterable in args:
        yield from my_iterable

for value in my_chain(my_list, my_dict, range(5,7)):
    print(value)
"""
1
2
3
html1
html2
5
6
"""

yield语法与yield from语法的区别

yield后面跟什么就返回什么;yield from后面跟一个可迭代对象,会将这个可迭代对象中的值返回去。

def g1(iterable):
    # yield后面跟什么就返回什么
    yield iterable

def g2(iterable):
    # yield from后面跟一个可迭代对象,会将这个可迭代对象中的值返回去。
    yield from iterable

for v in g1(range(3)):
    print(v)

print("==================")

for v in g2(range(3)):
    print(v)
"""
range(0, 3)
==================
0
1
2
"""

yield from会在调用方与子生成器之间建立一个双向通道

"""
# 1. main:调用方;g1:委托生成器;gen:子生成器
# 2. yield from会在调用方与子生成器之间建立一个双向通道
# 此时子生成器gen与主调用方main就建立起了联系了
"""

# gen:子生成器
def g1(gen):
    yield from gen


# g1:委托生成器
def main():
    g = g1()
    g.send(None)

调用方+委托生成器+子生成器的使用:构建销量字典

final_result = {}

# 子生成器 —— 接收外面传来的值并做保存
def sales_sum(pro_name):
    total = 0
    nums = []
    while True:
        x = yield
        print(pro_name+"销量: ", x)
        # 外面传进来的值为None结束
        if not x:
            break
        total += x
        nums.append(x)
    return total, nums

# 委托生成器 —— 将每一行数据进行统计
def middle(key):
    while True:
        # yield from调用子生成器
        final_result[key] = yield from sales_sum(key)
        print(key+"销量统计完成!!.")

# 调用方
def main():
    data_sets = {
        "面膜": [1200,1500],
        "飞机": [28,55,98,108],
        "杯子": [280,560,778,70],
    }
    for key, data_set in data_sets.items():
        print("start key:", key)
        m = middle(key)
        # 预激middle协程
        m.send(None) 
        for value in data_set:
            m.send(value)   # 给协程传递每一组的值
        m.send(None)
    print("final_result:", final_result)

if __name__ == '__main__':
    main()
"""
start key: 面膜
面膜销量:  1200
面膜销量:  1500
面膜销量:  None
面膜销量统计完成!!.
start key: 飞机
飞机销量:  28
飞机销量:  55
飞机销量:  98
飞机销量:  108
飞机销量:  None
飞机销量统计完成!!.
start key: 杯子
杯子销量:  280
杯子销量:  560
杯子销量:  778
杯子销量:  70
杯子销量:  None
杯子销量统计完成!!.
final_result: {'面膜': (2700, [1200, 1500]), '飞机': (289, [28, 55, 98, 108]), '杯子': (1688, [280, 560, 778, 70])}
"""

子生成器分析:yield from的强大之处

单独使用子生成器的话还得捕获异常!在委托生成器中使用yield from的话不用考虑这一点。

def sales_sum(pro_name):
    total = 0
    nums = []
    while True:
        x = yield
        print(pro_name+"销量: ", x)
        if not x:
            break
        total += x
        nums.append(x)
    return total, nums

if __name__ == "__main__":
    my_gen = sales_sum("飞机")
    # send的第一个值必须是None!!!
    my_gen.send(None)
    my_gen.send(1200)
    my_gen.send(1500)
    my_gen.send(3000)
    try:
        my_gen.send(None)
    except StopIteration as e:
        result = e.value
        print(result)
"""
飞机销量:  1200
飞机销量:  1500
飞机销量:  3000
飞机销量:  None
(5700, [1200, 1500, 3000])
"""

yield from的原理说明(pep380文档)

# pep380

#1. RESULT = yield from EXPR可以简化成下面这样
#一些说明
"""
_i:子生成器,同时也是一个迭代器
_y:子生成器生产的值
_r:yield from 表达式最终的值
_s:调用方通过send()发送的值
_e:异常对象

"""

_i = iter(EXPR)      # EXPR是一个可迭代对象,_i其实是子生成器;
try:
    _y = next(_i)   # 预激子生成器,把产出的第一个值存在_y中;
except StopIteration as _e:
    _r = _e.value   # 如果抛出了`StopIteration`异常,那么就将异常对象的`value`属性保存到_r,这是最简单的情况的返回值;
else:
    while 1:    # 尝试执行这个循环,委托生成器会阻塞;
        _s = yield _y   # 生产子生成器的值,等待调用方`send()`值,发送过来的值将保存在_s中;
        try:
            _y = _i.send(_s)    # 转发_s,并且尝试向下执行;
        except StopIteration as _e:
            _r = _e.value       # 如果子生成器抛出异常,那么就获取异常对象的`value`属性存到_r,退出循环,恢复委托生成器的运行;
            break
RESULT = _r     # _r就是整个yield from表达式返回的值。

"""
1. 子生成器可能只是一个迭代器,并不是一个作为协程的生成器,所以它不支持.throw()和.close()方法;
2. 如果子生成器支持.throw()和.close()方法,但是在子生成器内部,这两个方法都会抛出异常;
3. 调用方让子生成器自己抛出异常
4. 当调用方使用next()或者.send(None)时,都要在子生成器上调用next()函数,当调用方使用.send()发送非 None 值时,才调用子生成器的.send()方法;
"""

_i = iter(EXPR)
try:
    _y = next(_i)
except StopIteration as _e:
    _r = _e.value
else:
    while 1:
        try:
            _s = yield _y
        except GeneratorExit as _e:
            try:
                _m = _i.close
            except AttributeError:
                pass
            else:
                _m()
            raise _e
        except BaseException as _e:
            _x = sys.exc_info()
            try:
                _m = _i.throw
            except AttributeError:
                raise _e
            else:
                try:
                    _y = _m(*_x)
                except StopIteration as _e:
                    _r = _e.value
                    break
        else:
            try:
                if _s is None:
                    _y = next(_i)
                else:
                    _y = _i.send(_s)
            except StopIteration as _e:
                _r = _e.value
                break
RESULT = _r

"""
看完代码,我们总结一下关键点:
1. 子生成器生产的值,都是直接传给调用方的;调用方通过.send()发送的值都是直接传递给子生成器的;如果发送的是 None,会调用子生成器的__next__()方法,如果不是 None,会调用子生成器的.send()方法;
2. 子生成器退出的时候,最后的return EXPR,会触发一个StopIteration(EXPR)异常;
3. yield from表达式的值,是子生成器终止时,传递给StopIteration异常的第一个参数;
4. 如果调用的时候出现StopIteration异常,委托生成器会恢复运行,同时其他的异常会向上 "冒泡";
5. 传入委托生成器的异常里,除了GeneratorExit之外,其他的所有异常全部传递给子生成器的.throw()方法;如果调用.throw()的时候出现了StopIteration异常,那么就恢复委托生成器的运行,其他的异常全部向上 "冒泡";
6. 如果在委托生成器上调用.close()或传入GeneratorExit异常,会调用子生成器的.close()方法,没有的话就不调用。如果在调用.close()的时候抛出了异常,那么就向上 "冒泡",否则的话委托生成器会抛出GeneratorExit异常。
"""

Python中的原生协程:async与await

python3.5为了将语义变得更加明确,就引入了async和await关键词用于定义原生的协程。

注意:async中不能有yield。

""" types.coroutine装饰的函数可以放在await后面 """
# import types
# @types.coroutine
# def downloader(url):
#     yield "whw"

from collections import Awaitable

async def downloader(url):
    return "whw"

async def download_url(url):
    # dosomethings
    # await后面只能接收Awaitable对象
    html = await downloader(url)
    return html

if __name__ == "__main__":
    coro = download_url("http://www.imooc.com")
    # 原生协程不能使用next调用!须使用send调用!
    # next(None)
    coro.send(None)

生成器实现协程的一个例子

# -*- coding:utf-8 -*-
import time

def consumer():
    # consumer作为一个生成器
    while 1:
        data = yield

def producer():
    # 生成器对象
    g = consumer()
    # 先next后面才能send具体的非None的值,相当于先send一个None
    next(g)
    for i in range(1000000):
        g.send(i)

if __name__ == '__main__':
    start = time.time()
    #并发执行,但是任务producer遇到io就会阻塞住,并不会切到该线程内的其他任务去执行
    producer()
    print('执行时间:',time.time() - start)