1. 并行编程的惯用法
人们通常认为并行等同于多核,但现代计算机在不同层次上都使用了并行技术.比如说,单核的运行速度现今仍能每年不断提升的原因是--单核包含的晶体管数量,如同摩尔定律预测的那样变得越来越多,而单核在位级和指令级两个层次上都能够并行地使用这些晶体管资源.
位级(bit-level)并行
为什么32位计算机的运行速度比8位计算机更快?因为并行.对于两个32位数的加法,8位计算机必须进行多次8位计算,而32位计算机可以一步完成,即并行地处理32位数的4字节.
计算机的发展经历了8位、16位、32位,现在正处于64位时代.然而由位升级带来的性能改善是存在瓶颈的,这也正是短期内我们无法步入128位时代的原因.
指令级(instruction-level)并行
现代CPU的并行度很高,其中使用的技术包括流水线、乱序执行和猜测执行等.
程序员通常可以不关心处理器内部并行的细节,因为尽管处理器内部的并行度很高,但是经过精心设计,从外部看上去所有处理都像是串行的.
而这种"看上去像串行"的设计逐渐变得不适用.处理器的设计者们为单核提升速度变得越来越困难.进入多核时代,我们必须面对的情况是--无论是表面上还是实质上,指令都不再串行执行了.
数据级(data)并行
数据级并行(也称为"单指令多数据",SIMD)架构,可以并行地在大量数据上施加同一操作.这并不适合解决所有问题,但在适合的场景却可以大展身手.
图像处理就是一种适合进行数据级并行的场景.比如为了增加图片亮度就需要增加每一个像素的亮度.现代GPU(图形处理器)也因图像处理的特点而演化成了极其强大的数据并行处理器.
任务级(task-level)并行
终于来到了大家所认为的并行形式——多处理器.从程序员的角度来看,多处理器架构最明显的分类特征是其内存模型(共享内存模型或分布式内存模型).
对于共享内存的多处理器系统,每个处理器都能访问整个内存,处理器之间的通信主要通过内存进行.
对于分布式内存的多处理器系统,每个处理器都有自己的内存,处理器之间的通信主要通过网络进行.
通过内存通信比通过网络通信更简单更快速,所以用共享内存编程往往更容易.然而当处理器个数逐渐增多,共享内存就会遭遇性能瓶颈——此时不得不转向分布式内存.如果要开发一个容错系统,就要使用多台计算机以规避硬件故障对系统的影响,此时也必须借助于分布式内存.
1.1. 并行方式如何选择
首先python的多线程和协程都是共享内存式的,而多进程虽然是在同一机器上,但各个进程间并不共享内存,因此是分布式的.
看起来多进程,多线程,协程都是以并行的方式运行的,那么我们该如何选择使用什么技术呢?
首先我们可以简单的通过分析目标功能来选择,如果我们的项目主要是计算密集型的,比如是并行计算多个数据是否是质数这类,那么没得选,只有多进程才可以做到最大化利用cpu资源,另外两个都只能跑满一个cpu核心.
接着就是主要是io操作的任务了,io密集型任务首选当然是协程,也只有协程可以搞定10k问题,但python的默认I/O多是同步I/O,因此在所需依赖无法满足的情况下只能使用多线程方式替代.
协程和多线程都最多跑满一个核心,但其机制是完全不一样的,协程是用户组织代码,因此是写成顺序执行的异步执行,说白了还是在顺序执行,只是线程运行哪段代码会在协程间跳转执行,打个比方有点像拉链,只要有一个齿坏了,整个过程就会卡住.
但多线程则完全不同,一个线程卡死了并不会影响其他线程.
1.2. 并行编程的常用同步机制(原语)
python包含多种同步机制,这些工具使用思路上是一致的,因此无论是协程,线程还是进程都可以使用,只是使用的模块会有些许不同,用途也会有写不同
1.2.1. 信号量 Semaphore
在并行编程中,为了防止不同的过程(线程/进程/协程)同时对一个公用的资源进行修改,需要进行同时访问的数量(通常是1).信号量同步基于内部计数器,每调用一次acquire(),计数器减1;每调用一次release(),计数器加1.计数器的值永远不会小于0.当计数器为0时.acquire()调用被阻塞,直到其他线程来调用release().Semaphore支持上下文管理协议
Semaphore的接口有两个:
acquire()
获取一个信号量,协程中这个方法是一个协程release()
释放一个信号量locked()
协程中独有,用来判断是否被锁定
信号量有两种:
Semaphore
标准信号量
BoundedSemaphore
有界信号量,它会检查内部计数器的值,并保证它不会大于初始值,如果超了,就引发一个
ValueError
多数情况下,semaphore用于守护限制访问(但不限于1)的资源,如果semaphore
被release()
过多次,这意味着存在bug.
信号量在线程,进程,协程中的使用的模块并不一样:
协程--
asynico.Semaphore(value=1, *, loop=None)
线程--
threading.Semaphore(value=1)
进程--
multiprocessing.Semaphore([value])
协程版本
import aiohttp import asyncio NUMBERS = range(12) URL = 'http://httpbin.org/get?a={}' sema = asyncio.Semaphore(3) async def fetch_async(a): async with aiohttp.request('GET', URL.format(a)) as r: data = await r.json() return data['args']['a'] async def print_result(a): async with sema: r = await fetch_async(a) print('fetch({}) = {}'.format(a, r)) #loop = asyncio.new_event_loop() #asyncio.set_event_loop(loop) loop = asyncio.get_event_loop() f = asyncio.wait([print_result(num) for num in NUMBERS]) loop.run_until_complete(f)
fetch(3) = 3 fetch(4) = 4 fetch(9) = 9 fetch(10) = 10 fetch(1) = 1 fetch(6) = 6 fetch(5) = 5 fetch(11) = 11 fetch(0) = 0 fetch(2) = 2 fetch(7) = 7 fetch(8) = 8 ({<Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>}, set())
多线程版本
import time from random import random from threading import Thread, Semaphore sema = Semaphore(3) def foo(tid): with sema: print('{} acquire sema'.format(tid)) wt = random() * 2 time.sleep(wt) print('{} release sema'.format(tid)) threads = [] for i in range(5): t = Thread(target=foo, args=(i,)) threads.append(t) t.start() for t in threads: t.join()
0 acquire sema 1 acquire sema 2 acquire sema 2 release sema3 acquire sema 0 release sema 4 acquire sema 3 release sema 4 release sema 1 release sema
多进程
%%writefile src/semaphore.py from multiprocessing import Process, Semaphore def foo(tid,sema): import time from random import random with sema: print('{} acquire sema'.format(tid)) wt = random() * 2 time.sleep(wt) print('{} release sema'.format(tid)) if __name__ == "__main__": sema = Semaphore(3) processes = [] for i in range(5): t = Process(target=foo, args=(i,sema)) processes.append(t) for t in processes: t.start() for t in processes: t.join()
Overwriting src/semaphore.py
!python src/semaphore.py
0 acquire sema 1 acquire sema 2 acquire sema 2 release sema 3 acquire sema 0 release sema 4 acquire sema 1 release sema 3 release sema 4 release sema
1.2.2. 锁Lock
Lock也可以叫做互斥锁,其实相当于信号量为1.
在多线程中锁的作用是用于锁定读写,以确认同一个资源同一时间只能被一个操作访问.
python中锁有两种
Lock
标准锁
RLock
可重入锁,可以由同一个过程多次获取.在内部,除了原始锁使用的锁定/解锁状态之外,它还使用"拥有过程"和"递归级别"的概念.在锁定状态下,某些过程拥有锁;在解锁状态下,没有线程拥有它.
我们先看一个不加锁的例子:
import time from threading import Thread value = 0 def getlock(): global value new = value + 1 time.sleep(0.001) # 使用sleep让线程有机会切换 value = new threads = [] for i in range(100): t = Thread(target=getlock) t.start() threads.append(t) for t in threads: t.join() print(value)
10
不加锁的情况下,结果会远远的小于100.那我们加上互斥锁看看
fetch(3) = 3 fetch(4) = 4 fetch(9) = 9 fetch(10) = 10 fetch(1) = 1 fetch(6) = 6 fetch(5) = 5 fetch(11) = 11 fetch(0) = 0 fetch(2) = 2 fetch(7) = 7 fetch(8) = 8 ({<Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>}, set())
0
fetch(3) = 3 fetch(4) = 4 fetch(9) = 9 fetch(10) = 10 fetch(1) = 1 fetch(6) = 6 fetch(5) = 5 fetch(11) = 11 fetch(0) = 0 fetch(2) = 2 fetch(7) = 7 fetch(8) = 8 ({<Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>}, set())
1
锁作为一种特殊信号量,它的接口与Semaphore一致.在线程,进程,协程中的使用的模块分别为:
协程--
asynico.Lock(*,loop=None)
线程--
threading.Lock(value=1)
进程--
multiprocessing.Lock([value])
在协程中,实际上协程并没有抢占资源的情况,因此此处的锁更多的是用来作为一个全局的变量锁定一些流程用
fetch(3) = 3 fetch(4) = 4 fetch(9) = 9 fetch(10) = 10 fetch(1) = 1 fetch(6) = 6 fetch(5) = 5 fetch(11) = 11 fetch(0) = 0 fetch(2) = 2 fetch(7) = 7 fetch(8) = 8 ({<Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>}, set())
2
fetch(3) = 3 fetch(4) = 4 fetch(9) = 9 fetch(10) = 10 fetch(1) = 1 fetch(6) = 6 fetch(5) = 5 fetch(11) = 11 fetch(0) = 0 fetch(2) = 2 fetch(7) = 7 fetch(8) = 8 ({<Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>}, set())
3
而针对于多进程,锁同样起到一个全局信号的作用,比如多个进程处理同一个文件,就需要加锁来限制
fetch(3) = 3 fetch(4) = 4 fetch(9) = 9 fetch(10) = 10 fetch(1) = 1 fetch(6) = 6 fetch(5) = 5 fetch(11) = 11 fetch(0) = 0 fetch(2) = 2 fetch(7) = 7 fetch(8) = 8 ({<Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>}, set())
4
fetch(3) = 3 fetch(4) = 4 fetch(9) = 9 fetch(10) = 10 fetch(1) = 1 fetch(6) = 6 fetch(5) = 5 fetch(11) = 11 fetch(0) = 0 fetch(2) = 2 fetch(7) = 7 fetch(8) = 8 ({<Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>}, set())
5
fetch(3) = 3 fetch(4) = 4 fetch(9) = 9 fetch(10) = 10 fetch(1) = 1 fetch(6) = 6 fetch(5) = 5 fetch(11) = 11 fetch(0) = 0 fetch(2) = 2 fetch(7) = 7 fetch(8) = 8 ({<Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>}, set())
6
1.2.3. 事件
一个过程发送/传递事件,所谓事件是指的一个保存标记状态的对象,如果内部标记为True则表示事件发生了,反之就是没发生
事件的接口包括:
- clear()
事件内部标记为False
- is_set()
返回事件的内部标记
- set()
调用则设置内部标记为True
- wait()
等待事件被标记为True,协程中该接口为协程
另外的过程等待事件的触发.我们用「生产者/消费者」模型的例子.
协程
fetch(3) = 3 fetch(4) = 4 fetch(9) = 9 fetch(10) = 10 fetch(1) = 1 fetch(6) = 6 fetch(5) = 5 fetch(11) = 11 fetch(0) = 0 fetch(2) = 2 fetch(7) = 7 fetch(8) = 8 ({<Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>}, set())
7
fetch(3) = 3 fetch(4) = 4 fetch(9) = 9 fetch(10) = 10 fetch(1) = 1 fetch(6) = 6 fetch(5) = 5 fetch(11) = 11 fetch(0) = 0 fetch(2) = 2 fetch(7) = 7 fetch(8) = 8 ({<Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>}, set())
8
fetch(3) = 3 fetch(4) = 4 fetch(9) = 9 fetch(10) = 10 fetch(1) = 1 fetch(6) = 6 fetch(5) = 5 fetch(11) = 11 fetch(0) = 0 fetch(2) = 2 fetch(7) = 7 fetch(8) = 8 ({<Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>, <Task finished coro=<print_result() done, defined at <ipython-input-1-02b773a0950a>:11> result=None>}, set())
9
import time from random import random from threading import Thread, Semaphore sema = Semaphore(3) def foo(tid): with sema: print('{} acquire sema'.format(tid)) wt = random() * 2 time.sleep(wt) print('{} release sema'.format(tid)) threads = [] for i in range(5): t = Thread(target=foo, args=(i,)) threads.append(t) t.start() for t in threads: t.join()
0
线程
import time from random import random from threading import Thread, Semaphore sema = Semaphore(3) def foo(tid): with sema: print('{} acquire sema'.format(tid)) wt = random() * 2 time.sleep(wt) print('{} release sema'.format(tid)) threads = [] for i in range(5): t = Thread(target=foo, args=(i,)) threads.append(t) t.start() for t in threads: t.join()
1
import time from random import random from threading import Thread, Semaphore sema = Semaphore(3) def foo(tid): with sema: print('{} acquire sema'.format(tid)) wt = random() * 2 time.sleep(wt) print('{} release sema'.format(tid)) threads = [] for i in range(5): t = Thread(target=foo, args=(i,)) threads.append(t) t.start() for t in threads: t.join()
2
1.2.4. 条件Condition
条件用于信号通信,它的除了拥有锁的所有接口外,还有接口:
notify(n=1)
释放出通知,让使用相同Condition对象的几个过程知道这个条件已被激活
notify_all()
释放出通知,让使用相同Condition对象的所有过程知道这个条件已被激活
wait()
等待使用相同Condition对象的过程的通知.
wait_for(predicate)
相当于
import time from random import random from threading import Thread, Semaphore sema = Semaphore(3) def foo(tid): with sema: print('{} acquire sema'.format(tid)) wt = random() * 2 time.sleep(wt) print('{} release sema'.format(tid)) threads = [] for i in range(5): t = Thread(target=foo, args=(i,)) threads.append(t) t.start() for t in threads: t.join()
3
一个过程等待特定条件,而另一个过程发出特定条件满足的信号.最好说明的例子就是「生产者/消费者」模型:
协程方式
import time from random import random from threading import Thread, Semaphore sema = Semaphore(3) def foo(tid): with sema: print('{} acquire sema'.format(tid)) wt = random() * 2 time.sleep(wt) print('{} release sema'.format(tid)) threads = [] for i in range(5): t = Thread(target=foo, args=(i,)) threads.append(t) t.start() for t in threads: t.join()
4
import time from random import random from threading import Thread, Semaphore sema = Semaphore(3) def foo(tid): with sema: print('{} acquire sema'.format(tid)) wt = random() * 2 time.sleep(wt) print('{} release sema'.format(tid)) threads = [] for i in range(5): t = Thread(target=foo, args=(i,)) threads.append(t) t.start() for t in threads: t.join()
5
线程方式
import time from random import random from threading import Thread, Semaphore sema = Semaphore(3) def foo(tid): with sema: print('{} acquire sema'.format(tid)) wt = random() * 2 time.sleep(wt) print('{} release sema'.format(tid)) threads = [] for i in range(5): t = Thread(target=foo, args=(i,)) threads.append(t) t.start() for t in threads: t.join()
6
import time from random import random from threading import Thread, Semaphore sema = Semaphore(3) def foo(tid): with sema: print('{} acquire sema'.format(tid)) wt = random() * 2 time.sleep(wt) print('{} release sema'.format(tid)) threads = [] for i in range(5): t = Thread(target=foo, args=(i,)) threads.append(t) t.start() for t in threads: t.join()
7
进程方式
import time from random import random from threading import Thread, Semaphore sema = Semaphore(3) def foo(tid): with sema: print('{} acquire sema'.format(tid)) wt = random() * 2 time.sleep(wt) print('{} release sema'.format(tid)) threads = [] for i in range(5): t = Thread(target=foo, args=(i,)) threads.append(t) t.start() for t in threads: t.join()
8
import time from random import random from threading import Thread, Semaphore sema = Semaphore(3) def foo(tid): with sema: print('{} acquire sema'.format(tid)) wt = random() * 2 time.sleep(wt) print('{} release sema'.format(tid)) threads = [] for i in range(5): t = Thread(target=foo, args=(i,)) threads.append(t) t.start() for t in threads: t.join()
9
0 acquire sema 1 acquire sema 2 acquire sema 2 release sema3 acquire sema 0 release sema 4 acquire sema 3 release sema 4 release sema 1 release sema
0
import time from random import random from threading import Thread, Semaphore sema = Semaphore(3) def foo(tid): with sema: print('{} acquire sema'.format(tid)) wt = random() * 2 time.sleep(wt) print('{} release sema'.format(tid)) threads = [] for i in range(5): t = Thread(target=foo, args=(i,)) threads.append(t) t.start() for t in threads: t.join()
7
1.3. 队列
使用队列是最常见的同步方式.也是生产者消费者模式最常见使用的工具
队列的接口有:
qsize()
返回队列的大致大小
empty()
如果队列为空返回True
full()
如果队列满了,则返回空
put(item, block=True, timeout=None)
将元素放入队列,协程中是协程
put_nowait(item)
立即将元素放入队列
get(block=True, timeout=None)
获取元素,并且在队列中删除该元素,协程中是协程
get_nowait()
立即获取元素,并且在队列中删除该元素
task_done()
表明以前入队的任务是否已经完成。
join()
阻塞直到队列中的所有项目都被获取和处理.协程中是协程
常见的队列有两种:
queue
先进先出队列
LifoQueue
先进后出队列
PriorityQueue
优先权队列,放入的元素必须是Tuple[int,Any],第一位就是权重
对不同方式使用的队列为:
协程--
asyncio.Queue(maxsize)
线程--
queue.Queue(maxsize)
进程--
multiprocessing.Queue(maxsize)
依然用生产消费模式做例子
协程
0 acquire sema 1 acquire sema 2 acquire sema 2 release sema3 acquire sema 0 release sema 4 acquire sema 3 release sema 4 release sema 1 release sema
2
0 acquire sema 1 acquire sema 2 acquire sema 2 release sema3 acquire sema 0 release sema 4 acquire sema 3 release sema 4 release sema 1 release sema
3
线程
0 acquire sema 1 acquire sema 2 acquire sema 2 release sema3 acquire sema 0 release sema 4 acquire sema 3 release sema 4 release sema 1 release sema
4
0 acquire sema 1 acquire sema 2 acquire sema 2 release sema3 acquire sema 0 release sema 4 acquire sema 3 release sema 4 release sema 1 release sema
5
进程
0 acquire sema 1 acquire sema 2 acquire sema 2 release sema3 acquire sema 0 release sema 4 acquire sema 3 release sema 4 release sema 1 release sema
6
0 acquire sema 1 acquire sema 2 acquire sema 2 release sema3 acquire sema 0 release sema 4 acquire sema 3 release sema 4 release sema 1 release sema
7
还没有评论,来说两句吧...