1. 异步网络编程的组件介绍
asyncio提供了一系列的工具用于异步网络编程.由于各种原因,它的很多接口并不能跨平台.对于类Unix系统asyncio是完全支持的,但对于windows就不一定了,这点需要注意.
其中主要的部件有:
- Socket对象
- asyncio中对socket操作的接口
- Transports(通信)和protocols(协议)类
- 读写流
- 建立连接和服务器的工具
其他相关的工具包括
- ssl组件
- 信号量和信号处理接口
- Future对象操作
我们知道5层网络协议是从用户到计算机如下划分的:
- 应用层
- 传输层
- 网络层
- 链路层
- 物理层
以上这些工具都保证了字节串可以从一台主机传送到另一台主机,对应的是网络协议中的传输层和网络层协议.而链路层和物理层是计算机硬件和系统提供保证的,这样我们要做的只是定义和实现应用层协议即可.
1.1. socket对象
python中的模块socket提供了对socket的支持,无论是同步的网络编程还是异步的网络编程都是对这个对象的配置和使用.
1.1.1. Socket协议族(family)
当一个socket对象被创建时,该socket对象所需的地址格式会根据特定的地址族被自动选择.所谓地址族就是表明使用的网络层协议的常量.Socket地址被表示为:
Unix协议族
一个绑定到文件系统节点的`AF_UNIX`套接字的地址被表示为一个字符串,这个字符串使用文件系统编码与'surrogateescape'错误处理器.一个Linux抽象命名空间中的地址被作为一个被初始化为空字节的bytes-like object返回;注意命名空间中的这些地址可以与常规文件系统套接字通讯,因此打算运行在Linux上的程序需要能同时处理两种类型的地址.当作为个一参数传递时,一个字符串或类字节对象可以被用做任意类型的地址.
IP协议族
IPv4
一对参数(host,port)用于
AF_INET
地址族,其中host是一个字符串,表示在internet中像 'daring.cwi.nl'这样的域名或像'100.50.200.5'这样的IPv4地址, port(端口)参数是一个整数.IPv6
对于
AF_INET6
地址族, 一个地址被表示为一个四维元组(host, port, flowinfo, scopeid),其中flowinfo 和 scopeid 分别代表C中struct sockaddr_in6
结构体中的成员sin6_flowinfo
与sin6_scope_id
. 为了向后兼容,在 socket 模块的方法中, flowinfo 与 scopeid 参数可以被省略.但是请注意,忽略 scopeid 参数会导致操作IPv6地址时出现问题。
NETLINK协议族
Netlink套接字是用以实现用户进程与内核进程通信的一种特殊的进程间通信(IPC),也是网络应用程序与内核通信的最常用的接口. AF_NETLINK
套接字被表示为一个二维元组 (pid, groups).
TIPC协议族
TIPC是一个开放的,基于非 IP 的,为在集群计算机环境中使用而设计的网络协议. 只有 Linux 支持的TIPC需要使用AF_TIPC
地址族.地址由表示成一个元组,元祖字段取决于地址类型.一般的元组形式是 (addr_type, v1、 v2、 v3 [, scope]),其中︰
addr_type 是
TIPC_ADDR_NAMESEQ
,TIPC_ADDR_NAME
或TIPC_ADDR_ID
中的一个scope 是
TIPC_ZONE_SCOPE
,TIPC_CLUSTER_SCOPE
和TIPC_NODE_SCOPE
中的一个如果
addr_type
是TIPC_ADDR_NAME
,那么v1是服务器类型,v2是端口标识符,v3应为0如果
addr_type
是TIPC_ADDR_NAMESEQ
,那么v1是服务器类型,v2
是下级端口号,v3
是上级端口号如果
addr_type
是TIPC_ADDR_ID
,那么v1是节点,v2是引用,v3应该设置为0一个元组
(interface,)
用于AF_CAN
地址族,interface
是一个表示成像'can0' 的网络接口名称的字符串.网络接口名称''
可以用来接收这个地址族的所有网络接口的数据包.
SYSPROTO_CONTROL
协议
一个字符串或一个元组(id, unit)
用于PF_SYSTEM
族的SYSPROTO_CONTROL
协议.内核管理的字符串名称是动态分配的ID。如果内核管理的ID和unit号已知或者使用已注册ID,那么元祖可以被使用.
蓝牙协议
AF_BLUETOOTH
支持下列协议和地址格式:
BTPROTO_L2CAP接受(bdaddr,psm)其中bdaddr字符串和psm是一个整数.
BTPROTO_RFCOMM接受(bdaddr,通道)其中bdaddr字符串和channel是一个整数.
BTPROTO_HCI接受(device_id,)其中device_id是整数或带有接口蓝牙地址的字符串.(这取决于你的操作系统,NetBSD和DragonFlyBSD期望一个蓝牙地址,而其他都期望一个整数)
BTPROTO_SCO接受bdaddr其中bdaddr是包含字符串格式的蓝牙地址的bytes对象.(例如,b'12:23:34:45:56:67')FreeBSD不支持此协议.
某些其他地址系列(AF_PACKET,AF_CAN)支持特定表示。
在表示IPv4地址时,可以采用两种特殊的字符串形式来表示特定的主机地址:
- 用空字符串表示INADDR_ANY;
- 用字符串'
'来表示 INADDR_BROADCAST
此行为与IPv6不兼容,因此,如果您打算使Python程序支持IPv6,则可能希望避免这些行为.
如果在IPv4/v6套接字地址的主机部分中使用主机名,则程序可能显示非确定性行为,因为Python使用从DNS解析返回的第一个地址.根据DNS解析或主机配置的结果,套接字地址将以不同的方式解析为实际的IPv4/v6地址.对于确定的行为,在host部分中使用应该使用数字地址.
1.1.2. IP协议 socket常见类型常量和设置常量
asyncio的高级socket接口目前只支持Unix协议族和IP协议族,Unix协议族只用于本机间多进程通信,而最常用的是IP协议族,因此本文后面所说的内容都是针对IP协议族的.
类型常量
socket的类型常量用于表明使用的传输层协议.很多,更加推荐直接去查unix说明书,这边列几个相对常用的
- SOCK_STREAM 表示类型为TCP协议,提供面向连接的稳定数据传输
- SOCK_DGRAM 表示类型为UDP协议,使用不连续不可靠的数据包连接
- SOCK_RAW 表示类型原始套接字
- SOCK_RDM 表示提供可靠的数据包连接
- SOCK_SEQPACKET 表示提供连续可靠的数据包连接
设置socket属性
设置socket需要通过接口socket.setsockopt(level, optname, value)
来执行.一般可以设置的有:
在套接字级别(SOL_SOCKET)上,option_name可以有以下取值:
SO_DEBUG
打开或关闭调试信息.SO_REUSEADDR
打开或关闭地址复用功能.SO_REUSEPORT
打开或关闭端口复用功能.SO_DONTROUTE
打开或关闭路由查找功能.SO_BROADCAST
(udp专用)允许或禁止发送广播数据.SO_SNDBUF
设置发送缓冲区的大小,其上限为256 * (sizeof(struct sk_buff) + 256),下限为2048字节.SO_RCVBUF
,设置接收缓冲区的大小.上下限分别是:256 * (sizeof(struct sk_buff) + 256)和256字节.SO_KEEPALIVE
套接字保活SO_OOBINLINE
紧急数据放入普通数据流.SO_NO_CHECK
打开或关闭校验和SO_PRIORITY
设置在套接字发送的所有包的协议定义优先权.SO_LINGER
如果选择此选项,close
或shutdown
将等到所有套接字里排队的消息成功发送或到达延迟时间后才会返回. 否则,调用将立即返回.SO_PASSCRED
允许或禁止SCM_CREDENTIALS
控制消息的接收.SO_TIMESTAMP
打开或关闭数据报中的时间戳接收.SO_RCVLOWAT
设置接收数据前的缓冲区内的最小字节数.SO_RCVTIMEO
设置接收超时时间.SO_SNDTIMEO
设置发送超时时间SO_BINDTODEVICE
将套接字绑定到一个特定的设备上
创建一个socket对象
通常创建一个socket是如下过程:
- 创建一个socket对象指明其使用的协议族和协议类型
- 为其绑定地址和端口
- 使用
setsockopt
设置不同level上的设置
比如下面是一个用于多播的udp的socket:
def make_sock(): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.bind(('', 1900)) group = socket.inet_aton('239.255.255.250') mreq = struct.pack('4sL', group, socket.INADDR_ANY) sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) return sock
1.2. asyncio的socket底层api
asyncio的高级socket接口目前只支持Unix协议族和IP协议族,当然我们可以用底层的接口配合使用sock.setblocking(False)
设置为非阻塞的套接字对象自行封装,不过这就太过专业,本文不细讲,后面所说的内容都是针对IP协议族的
asyncio提供的底层套接字处理接口为:
coroutine AbstractEventLoop.sock_recv(sock, nbytes)->bytes
从套接字接收数据.阻塞socket.socket.recv()方法后建模.返回值是表示接收到的数据的字节对象.一次接收的最大数据量由nbytes指定.
coroutine AbstractEventLoop.sock_sendall(sock, data)->None
将数据发送到套接字,阻塞socket.socket.sendall()方法后建模.必须连接到远端套接字,此方法连续发送数据,直到所有数据已发送或发生错误.出现错误时,会引发异常,并且无法确定连接的接收端成功处理了多少数据
使用SelectorEventLoop事件循环,套接字sock必须是非阻塞的。
coroutine AbstractEventLoop.sock_connect(sock, address)
在地址连接到远程套接字,阻塞
socket.socket.connect()
方法后建模.coroutine AbstractEventLoop.sock_accept(sock)
接受连接.阻塞
socket.socket.accept()
后建模.套接字必须绑定到地址并侦听连接.返回值是一对(conn,地址)
其中conn是socket对象可用于在连接上发送和接收数据,address是连接另一端的套接字的地址。
1.3. Transports(通信)和protocols(协议)类
protocols(协议)类用于定义网络客户端/服务端的行为主要是对读入的字节流做出响应;而 Transports(通信)则是建立连接后对远端的操作,主要是写入操作.一旦连接上,连接就会创建一对Transports(通信)和protocols(协议)类的实例.他们就对应了一次连接,通常来说这俩是protocols(协议)调用Transports(通信)对象,Transports(通信)被protocols(协议)对象调用的关系.
1.3.1. Transports(通信)类
一般来说 Transports(通信)类不需要我们进行实例化,而是根据创建连接时候的设置由create_connection
接口自行生成.
Transports
类通用的方法有:
close(self)
关闭Transport.如果Transport对象具有用于传出数据的缓冲区,则缓冲的数据将被异步刷新.也不再接收数据.在所有缓冲数据被刷新之后,Transport对象的
connection_lost()
方法将None作为其参数被调用.is_closing(self)
如果Transport正在关闭或已经关闭,则返回True.
get_extra_info(name, default=None)
返回可选的Transport对象信息.name是表示要获取的传输特定信息的字符串,如果信息不存在则返回default的值.
这种方法允许transport更容易地暴露通道特定的信息。
套接字属性:
- 'peername':套接字连接的远程地址,socket.socket.getpeername()的结果
- 'socket':socket.socket实例
- 'sockname':套接字自己的地址,socket.socket.getsockname()
SSL套接字属性:
- 'compression':压缩算法用作字符串,或None如果连接未压缩; ssl.SSLSocket.compression()的结果
- 'cipher':包含所使用的密码的名称,定义其使用的SSL协议的版本和正在使用的秘密位的数量的三值元组; ssl.SSLSocket.cipher()的结果
- 'peercert':对等证书; ssl.SSLSocket.getpeercert()的结果
- 'sslcontext':ssl.SSLContext实例
- 'ssl_object':ssl.SSLObject或ssl.SSLSocket实例
管道:
- 'pipe':pipe对象
子过程:
- 'subprocess':subprocess.Popen实例
TCP协议可用的方法:
pause_reading()
transport暂停接收.在调用resume_reading()之前,不会将任何数据传递到协议的data_received()
resume_reading()
恢复数据接收.如果一些数据可用于读取,则将再次调用协议的data_received()方法.
abort()
立即关闭Transport对象,无需等待未知的操作完成.缓冲数据将丢失.数据也不会再被接收.Transport对象的connection_lost()方法最终将以None作为参数进行调用.
can_write_eof()
如果Transport对象支持write_eof()方法,则返回True,否则返回False
get_write_buffer_size()
返回Transport对象所使用的输出缓冲区的当前大小.
get_write_buffer_limits()
得到写缓冲的高位和低位的值.返回元组(低,高)其中低和高是确定的的字节数.
使用set_write_buffer_limits()函数设置写缓冲的高低位限制。
set_write_buffer_limits(high=None, low=None)
设置写入流控制的高和低水限制. 这两个值控制调用协议的
pause_writing()
和resume_writing()
方法的时间.如果指定,低位限制必须小于或等于高位限制.高或低都可以为负. 默认值是特定于实现的.- 如果仅给出了高水位限制,则低水位限制默认为小于或等于高水位限制的实施特定值.
- 将高设置为零强制低为零,并导致每当缓冲区变为非空时调用
pause_writing()
. 将低设置为
0
会导致resume_writing()
仅在缓冲区为空时调用对任一限制使用零通常是次优的,因为它减少了同时进行I/O和计算的机会.
write(data)
将一些数据字节写入Transport对象. 本方法不阻塞;数据将被异步发送.
writelines(list_of_data)
将列表(或任何写迭代对象)写入到Transport对象中.这在功能上等同于对由迭代器产生的每个元素调用write(),但是可以更有效地实现.
write_eof()
在刷新缓冲的数据后关闭transport.但仍可接收数据. 如果transport不支持的话此方法可能引发NotImplementedError异常(例如,SSL)
UDP协议可用的方法
DatagramTransport.sendto(data, addr=None)
将数据字节发送到由addr(传输相关的目标地址)给出的远程对等体.如果addr是None,则将数据发送到在传输创建时给定的目标地址. 这个方法不阻塞;它缓冲数据并安排它异步发送。
DatagramTransport.abort()
立即关闭运输,无需等待待完成的操作。缓冲数据将丢失。不会收到更多的数据。协议的connection_lost()方法最终将以None作为参数进行调用。
1.3.2. protocols(协议)类
协议类一般用于继承,它定义了几个方法需要其子类进行重写实现,可以将其看作通信的蓝图,实例化后收到相应的信息后就会按照这个蓝图规定的行为来执行.
TCP协议使用的protocols(协议)类为asyncio.Protocol
,而UDP则使用asyncio.DatagramProtocol
所有protocols(协议)类都会需要设定的方法有:
connection_made(self,transport)
规定建立连接后的行为,
transport
参数是表示当前产生连接的Transport对象.一般来说会存起来connection_lost(self,exc)
规定链接断开后的行为,参数是异常对象或None。后者意味着接收到正常的EOF,或者连接被连接的这一侧中止或关闭
pause_writing(self)
规定当传输缓冲区超过高水位标记时的行为
resume_writing(self)
规定当Transport缓冲低于低位标记时的行为
TCP协议类需要设定的方法
data_received(data)
规定当接收到数据后的行为
eof_received()
当另一端发出信号时,它将不再发送任何数据(例如通过调用write_eof(),如果另一端也使用asyncio). 此方法可能返回false值(包括None),在这种情况下,传输将关闭自身.相反,如果此方法返回一个真值,关闭传输由协议决定.由于默认实现返回None,它隐式关闭连接. 注意:某些传输(如SSL)不支持半关闭连接,在这种情况下,从此方法返回true将不会阻止关闭连接。
TCP的交互流程如下:
start -> connection_made() [-> data_received() *] [-> eof_received() ?] -> connection_lost() -> end
UDP协议类需要设定的方法
datagram_received(self,data, addr)
规定接收到数据报时的行为.
data
是包含传入数据的字节对象.addr
是发送数据的对方的地址;确切的格式取决于Transport对象.error_received(self,exc)
规定在先前的发送或接收操作引发OSError时的行为.
exc
是OSError实例.该方法通常很少被调用,一般是当检测到数据报不能传递到其收件人时.不过在许多情况下,不可传递的数据报将被静默丢弃.
1.4. 读写流
读写流是针对TCP协议和Unix协议对上面Transports(通信)和protocols(协议)类的进一步包装,这连个协议的读写行为都被包装成了两个类:
- StreamReader 流读取器
- StreamWriter 流写入器
使用asyncio.open_connection
和asyncio.open_unix_connection
都可以获得一对StreamReader 和StreamWriter的实例,而asyncio.start_server
和asyncio.start_unix_server
中的第一参数--回调函数client_connected_cb
的两个输入也是StreamReader 和StreamWriter的实例
本文还是以TCP协议作为标准来看这两个类.
通常如果使用上面提到的这4个接口的话我们就不需要自己实例化流读取器和流写入器了,但也会有很多时候读写流会结合协议类一起工作,这种时候就需要我们手动创建这两个类了.
1.4.1. StreamReader 流读取器
初始化一个StreamReader实例只要两个参数StreamReader(limit=None, loop=None)
,其中limit是缓冲区限制,loop是事件循环实例,StreamReader随时都可以创建实例.结合协议类使用的时候一般在协议类的__init__
方法中创建实例.
其实例方法有:
feed_eof()
确认EOF.
feed_data(data)
在内部缓冲区中输入数据字节.将恢复等待数据的任何操作.
set_transport(transport)
设置对应的transport对象
coroutine read(n=-1)
读取最多n个字节,如果未提供n或设置为-1.则读取直到EOF并返回所有读取的字节.如果接收到EOF并且内部缓冲区为空,则返回一个空的bytes对象.
coroutine readline()
读取一行,其中“line”是以\n结尾的字节序列.如果接收到EOF,并且未找到\n,则该方法将返回部分读取字节.如果接收到EOF并且内部缓冲区为空,则返回一个空的bytes对象.
coroutine readexactly(n)
读取n字节.如果读取n之前到达流的末尾,则引发IncompleteReadError,异常的IncompleteReadError.partial属性包含部分读字节.
coroutine readuntil(separator=b'\n')
从流中读取数据,直到找到separator.成功时,数据和分隔符将从内部缓冲区(消耗)中删除.返回的数据将包括末尾的分隔符.配置流限制用于检查结果.Limit设置可以返回的数据的最大长度,不计算分隔符. 如果发生EOF并且仍未找到完整的分隔符,则会引发IncompleteReadError异常,并且内部缓冲区将被重置.IncompleteReadError.partial属性可能包含部分分隔符. 如果由于超限而无法读取数据,则会引发LimitOverrunError异常,并且数据将保留在内部缓冲区中,因此可以再次读取.
at_eof()
如果缓冲区为空且调用
feed_eof()
,则返回True.
1.4.2. StreamWriter 流写入器
初始化一个StreamWriter实例需要参数StreamReader(transport, protocol, reader, loop),其中第一个是需要建立连接后才能有的,因此一般如果结合协议类使用,会把创建工作放在connection_made
回调函数中.
其实例方法有:
can_write_eof()
如果transport支持write_eof()则返回True,否则False.
close()
关闭transport
coroutine drain()
让底层传输的写缓冲区可以被刷新.用途是:
w.write(data) yield from w.drain()
当传输缓冲区的大小达到高位限值(协议暂停)时,阻塞直到缓冲区的大小下降到低位限值,之后协议恢复.当没有什么要等待时就继续 从drain()得到的结果为循环提供了调度写入操作和刷新缓冲区的机会.尤其应当在可能有大量的数据写入transport时使用,而协程不会从
write()
的调用之间产生. 需要注意的是drain要求StreamReader._protocol
有参数_connection_lost
和_paused
用于标识.可以结合asyncio.stream.FlowControlMixin
来使用get_extra_info(name, default=None)
返回可选的transport信息
write(data)
将一些数据字节写入transport
writelines(data)
将数据字节的列表(或任何可迭代对象)写入transport
write_eof()
刷新缓冲数据后关闭transport
1.4.3. FlowControlMixin
这个Mixin前文提到过是用于为drain
方法服务的,他需要混入Protocol类中,并且会调用pause_writing()
,resume_reading()
和connection_lost()
方法.子类需要重写__init__
方法,并且需要使用super来初始化这个父类,其初始化参数只有一个loop,且可以不设置.
1.4.4. StreamReaderProtocol
TCP协议也可以使用StreamReaderProtocol
来作为应用层协议的基类,这样就不用自己动手实例化StreamReader对象了,同时这个类也被python官方定义为一个帮助类,用于指导用户正确的使用协议和流读写对象,其内容如下:
class StreamReaderProtocol(FlowControlMixin, protocols.Protocol): """Helper class to adapt between Protocol and StreamReader. (This is a helper class instead of making StreamReader itself a Protocol subclass, because the StreamReader has other potential uses, and to prevent the user of the StreamReader to accidentally call inappropriate methods of the protocol.) """ def __init__(self, stream_reader, client_connected_cb=None, loop=None): super().__init__(loop=loop) self._stream_reader = stream_reader self._stream_writer = None self._client_connected_cb = client_connected_cb self._over_ssl = False def connection_made(self, transport): self._stream_reader.set_transport(transport) self._over_ssl = transport.get_extra_info('sslcontext') is not None if self._client_connected_cb is not None: self._stream_writer = StreamWriter(transport, self, self._stream_reader, self._loop) res = self._client_connected_cb(self._stream_reader, self._stream_writer) if coroutines.iscoroutine(res): self._loop.create_task(res) def connection_lost(self, exc): if self._stream_reader is not None: if exc is None: self._stream_reader.feed_eof() else: self._stream_reader.set_exception(exc) super().connection_lost(exc) self._stream_reader = None self._stream_writer = None def data_received(self, data): self._stream_reader.feed_data(data) def eof_received(self): self._stream_reader.feed_eof() if self._over_ssl: # Prevent a warning in SSLProtocol.eof_received: # "returning true from eof_received() # has no effect when using ssl" return False return True
这个类给出了一个不错的示范,主要的注意点是:
- 继承
FlowControlMixin
data_received(self, data)
方法需要使用feed_data
方式像stream_reader中喂数据.
1.5. 建立连接和服务器的工具
套接字(socket)也是有几种的,常用的包括
- TCP套接字,最常见的套接字
- UDP套接字,只发送不验证的套接字,可以广播
- unix套接字,用于类Unix系统上同一主机间不同进程通信
1.5.1. 建立连接
coroutine AbstractEventLoop.create_connection(protocol_factory, host=None, port=None, *, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None)->(transport, protocol)
事件循环对象中用于创建TCP连接的协程,family
AF_INET
或者AF_INET6
,socket类型SOCK_STREAM
.host
和port
用于指定要连接的对象,会自己解析ip地址来判断使用的v4协议(
AF_INET
)还是v6协议(AF_INET6
),也可以通过family
参数进行指定其他可以设置的包括proto
,flags
都是用于设置连接的参数protocol_factory
参数要求是一个类或者工厂函数,用于生成
protocols
对象ssl
指定使用的ssl证书,可以配合
server_hostname
参数指定证书的服务器地址.如果给定且不为False,则创建SSL/TLS
传输(默认情况下创建纯TCP传输).如果ssl是ssl.SSLContext
对象,则此上下文用于创建传输;如果ssl为True
,则使用默认设置的ssl上下文.sock
用于指定套接字,使用这个参数必须
host
,port
都为None
local_addr
用于指定本地用于连接远端的端口和服务器地址,不设置的话表示自动生成
这个协程一旦执行完成其结果为一个由transport和 protocol实例组成的对,通常用于写客户端
coroutine asyncio.open_connection(host=None, port=None, *, loop=None, limit=None, **kwds)->(StreamReader,StreamWriter)
这个协程是上面
AbstractEventLoop.create_connection
的封装,会建立一个指向指定主机端口的TCP连接,一旦建立成功会返回一组(StreamReader,StreamWriter)对用于交互,而所有操作都是使用返回的两个对象StreamReader,StreamWriter来实现的.这个接口不使用Transports(通信)和protocols(协议)类
而是使用其创建出来的读写对象来实现,相对而言更加适合用于写客户端
coroutine AbstractEventLoop.create_unix_connection(protocol_factory, path, *, ssl=None, sock=None, server_hostname=None)->(transport, protocol)
这个接口用于创建Unix连接,用于同一主机上多进程间通信.socket family
AF_UNIX, socket
,类型SOCK_STREAM
,这个接口只在Unix系统上有用path
是Unix套接字域名,除非sock
参数被指定否咋就没有必要.这个接口比较不常用,本文不做阐述
coroutine asyncio.open_unix_connection(path=None, *, loop=None, limit=None, **kwds)->(StreamReader,StreamWriter)
create_unix_connection()
接口的封装,同样只能用在Unix上.
coroutine AbstractEventLoop.create_datagram_endpoint(protocol_factory, local_addr=None, remote_addr=None, *, family=0, proto=0, flags=0, reuse_address=None, reuse_port=None, allow_broadcast=None, sock=None)->(transport, protocol)
与上面相似,但是是用于建立UDP连接的.socket family
AF_INET
或者AF_INET6
,socket 类型为SOCK_DGRA
.和TCP协议不同,使用UDP协议进行信息的传输之前不需要建议连接.换句话说就是客户端向服务器发送信息,客户端只需要给出服务器的ip地址和端口号即可,然后将信息封装到一个待发送的报文中并且发送出去.至于远端是否存在,本地根本不管.UDP协议更多的是用在p2p通信上UDP因为不用建立连接,所以可以广播,消息发送到一个多播地址,那么所有该地址下的主机都会被发送一份这个要广播的信息.
protocol_factory
参数要求是一个类或者工厂函数,用于生成
protocols
对象local_addr
参数为(local_host, local_port)的形式,设置本地的地址和端口
remote_addr
参数为(remote_host, remote_port)的形式,设置本地的地址和端口
family, proto, flags
和上面一样,由于设置socket.
sock
用于指定套接字,和上面一样.针对UDP的话,如果要让UDP进行多播,那么更好的方式是指定sock对象,比如:
def make_sock(): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.bind(('', 1900)) group = socket.inet_aton('239.255.255.250') mreq = struct.pack('4sL', group, socket.INADDR_ANY) sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) return sock loop.create_datagram_endpoint(my_udp_protocol, sock=make_sock())
reuse_address
内核在time_wait状态下重用本地套接字,而不必等待其自然超时过期.
reuse_port
告诉内核允许该端点绑定到与其他现有端点绑定到的相同端口,只要它们在创建时都设置了该标志即可.这个选项在windows和一些unix上不支持.如果so_reuseport常量没有定义,那么这个功能是不支持的.
allow_broadcast
允许这个端点发送广播消息,允许广播的话那么局域网内的所有地址都会被发送一遍.
1.5.2. 创建服务器
coroutine AbstractEventLoop.create_server(protocol_factory, host=None, port=None, *, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None)->Server
创建一个TCP服务器(socket类型为
SOCK_STREAM
),其返回值为一个Server对象,这个对象包含close
方法用于关闭监听循环,协程wait_closed()
用于等待剩下的执行完毕然后关闭监听循环,sockets
参数用于保存已经建立连接的套接字对象.backlog
系统队列的最大连接数
protocol_factory
为协议对象的工厂函数,
host=None, port=None, *, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, ssl=None, reuse_address=None, reuse_port=None
用于设定socket对象和ssl的设置,和创建连接的部分一致
coroutine asyncio.start_server(client_connected_cb(client_reader, client_writer)->None, host=None, port=None, *, loop=None, limit=None, **kwds)->Server
create_server
的包装,其返回值也是一个Server对象.使用回调函数或者协程函数client_connected_cb
来规定服务器的行为,其参数为client_reader, client_writer
,他们分别是StreamReader
和StreamWriter
的实例.limit
参数用于设置缓冲区的大小限制.coroutine AbstractEventLoop.create_unix_server(protocol_factory, path=None, *, sock=None, backlog=100, ssl=None)->Server
创建一个Unix服务器(socket类型为
AF_UNIX
).参数和前文提到的意义一样.只能用在Unix系统上.coroutine asyncio.start_unix_server(client_connected_cb, path=None, *, loop=None, limit=None, **kwds)->Server
创建一个Unix服务器(socket类型为
AF_UNIX
).参数和前文提到的意义一样.只能用在Unix系统上.
1.6. ssl上下文
python的ssl模块提供了ssl支持.asyncio的连接和服务接口都可以指定ssl上下文用于加密.需要做的只是将ssl的上下文作为参数传入即可.
1.6.1. 服务端设置上下文
这段代码参考自sanic的实现:
context = ssl_module.create_default_context(purpose=Purpose.CLIENT_AUTH) context.load_cert_chain(cert, keyfile=key) ssl = context
其中cert
为证书位置字符串,keyfile
为密码位置字符串
1.6.2. 客户端设置上下文
这段代码参考自aioamqp的实现:
context = ssl_module.create_default_context() if not verify_ssl: context.check_hostname = False context.verify_mode = ssl_module.CERT_NONE ssl = context
其中verify_ssl
为bool型,指定是否验证服务器的SSL,默认为True
1.7. 信号量和信号处理接口
标准库的signal模块提供了一套基本跨平台通用的信号量.相对而言对与异步网络编程来说有用的信号量就是SIGINT
和 SIGTERM
,这两个一般用于终止进程,对应的是Ctrl+C
操作.
与信号量处理相关的接口有:
signal.signal(signalnum, handler)
为信号量设置回调函数,一旦触发这个信号就会执行回调函数signal.SIG_IGN
,预设的回调函数,用于屏蔽某一信号,要结合signal.signal
使用.asyncio.AbstractEventLoop.add_signal_handler(signum, callback, *args)
在事件循环中为信号添加处理回调函数,如果信号编号无效或不可复位,则引发ValueError.如果设置处理程序时出现问题,则引发RuntimeError.
asyncio.AbstractEventLoop.remove_signal_handler(sig)
移除事件循环中的信号处理
1.8. Future对象操作
一般来说我们不需要自己定义和操作Future对象,因为多数时候它都是被封装好的或者以Task的形式被使用的,但网络编程略底层了,不可避免的有时候需要手工创建Future对象,之后在一些回调函数中手工设置对应Future对象的结果或者异常.
举个简单的例子,比如我设置了10个任务,我希望可以使用一个接口获取对应任务的结果,而计算的结果无法通过返回值获取,因为它是在函数中使用call_soon
接口丢出去执行的.那么该怎么办呢?一个比较好的办法就是使用Future对象.Future对象本质上是一个占位符,它可以被一直await
挂起直到它被设置了一个result或者被设置了一个异常.
相关的接口是:
AbstractEventLoop.create_future()
/asyncio.Future(loop)
创建一个Future对象,并加入事件循环中轮询,一般用第一个接口.
future.cancel()
取消Future对象,安排回调.如果future已经完成或取消,返回False.否则将future的状态更改为
cancelled
,安排回调并返回True.future.add_done_callback(fn)
添加回调以便在未来完成时运行.回调函数的参数只有一个就是future对象本身,如果未来在调用时已经完成,则使用
call_soon()
计划回调.future.remove_done_callback(fn)
从'调用完成时'列表中删除回调的所有实例.返回已删除的回调的数量.
future.set_result(result)
标记future完成并设置其结果,如果调用此方法时未来已经完成,则引发InvalidStateError.
future.set_exception(exception)
标记future完成并设置异常.如果调用此方法时未来已经完成,则引发InvalidStateError.
下面我们来实现这个例子
import asyncio loop = asyncio.get_event_loop() async def main(loop): tasks = {str(i):loop.create_future() for i in range(10)} def f(i): f = tasks.get(str(i)) asyncio.sleep(1) f.set_result(i**2) [loop.call_soon(f,j) for j in range(10)] result = await tasks.get("5") print(result) loop.run_until_complete(main(loop))
25
以上就是一般异步网络编程会用到的组件了,后面的几节我们就按交互模式来讨论如何组合这些组件.而剩下的应用层协议就是定义如何编码解码这些传输过来的数据让它产生意义的协议.
1.9. 应用层协议
这边我们只讨论TCP协议之上的应用层协议.TCP协议是一种面向流的协议,一次一般来说应用层协议需要规定:
- 服务的提供形式和结构
- 如何将信息分段为一个一个的请求字节串
- 如何将请求字节串编码/解码为协议规定的动作符号
- 规定动作的行为和允许的取值范围,包括错误,响应的类型等
以http协议作为例子,我们知道:
- http采用的是应答结构,也就是一台服务器等待客户端访问自己提出需求,然后挨个儿响应这些需求;
- http协议使用短链接,也就是一次请求一次响应就结束连接,也就是说信息按照连接来分段;
- http协议规定头部用于告诉服务器要访问的uri,使用的method,这就是将特定的字节串编码解码为规定的动作;
- http协议规定了响应的格式和错误码,规定了html作为标准的响应结果(后来也支持使用xml或者json,但需要在头部表明),也规定了响应的编码用于指示状态.
还没有评论,来说两句吧...