1. python多进程编程
python受GIL限制,无法利用到多核,要使用多核提高cpu的利用率这种时候最简单的方式就是使用多进程实现突破GIL限制.
换言之python多进程的价值体现在CPU密集型作业上.
进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础.在早期面向进程设计的计算机结构中,进程是程序的基本执行实体;在当代面向线程设计的计算机结构中,进程是线程的容器.程序是指令、数据及其组织形式的描述,进程是程序的实体.
1.1. python调用系统fork
Unix/Linux操作系统提供了一个fork()
系统调用.普通的函数调用,调用一次,返回一次,但是fork()
调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后分别在父进程和子进程内返回.
子进程永远返回0,而父进程返回子进程的ID.这样做的理由是,一个父进程可以fork出很多子进程,所以父进程要记下每个子进程的ID,而子进程只需要调用getppid()
就可以拿到父进程的ID.
pythonos
模块封装了fork()
(unix-like系统).事实上在类unix系统下,python的多进程都是基于fork的,而windows下情况就不一样了.
1.2. windows下的多进程局限性
在windows下,由于没有fork,python的多进程模块multiprocessing
必须要有if __name__=='__main__':
也就是说它无法在非入口模块下使用
import os print('Process ({}) 开始...'.format(os.getpid())) # Only works on Unix/Linux/Mac: pid = os.fork() if pid == 0: print('子进程: ({}) 它的父进程是: ({}).'.format(os.getpid(), os.getppid())) else: print('父进程 ({}) 产生了子进程: ({}).'.format(os.getpid(), pid))
Process (36236) 开始... 父进程 (36236) 产生了子进程: (36401). 子进程: (36401) 它的父进程是: (36236).
1.3. 使用concurrent.futures进行高层抽象的多进程操作
在python3中,模块concurrent.futures
提供了一些更加简单易用的多进程操作,它主要利用进程池.
这个库支持多线程和多进程,接口一样,只是使用的对象不同而已.
concurrent.futures
提供两种编程模型:
并行任务模型
单独任务独立使用自己的过程和数据,多任务独立并行计算
MapReduce模型
为各个进程分发数据执行相同的过程
1.3.1. 并行任务模型
这个模型使用submit
提交任务到上下文管理器,之后使用返回对象的result()
方法阻塞io等待任务完成
from concurrent.futures import ProcessPoolExecutor,as_completed from random import randrange from time import time
def arcfour(key, in_bytes, loops=20): """rc4算法""" kbox = bytearray(256) # create key box for i, car in enumerate(key): # copy key and vector kbox[i] = car j = len(key) for i in range(j, 256): # repeat until full kbox[i] = kbox[i-j] # [1] initialize sbox sbox = bytearray(range(256)) # repeat sbox mixing loop, as recommened in CipherSaber-2 # http://ciphersaber.gurus.com/faq.html#cs2 j = 0 for k in range(loops): for i in range(256): j = (j + sbox[i] + kbox[i]) % 256 sbox[i], sbox[j] = sbox[j], sbox[i] # main loop i = 0 j = 0 out_bytes = bytearray() for car in in_bytes: i = (i + 1) % 256 # [2] shuffle sbox j = (j + sbox[i]) % 256 sbox[i], sbox[j] = sbox[j], sbox[i] # [3] compute t t = (sbox[i] + sbox[j]) % 256 k = sbox[t] car = car ^ k out_bytes.append(car) return out_bytes
clear = bytearray(b'1234567890' * 100000) t0 = time() cipher = arcfour(b'key', clear) print('elapsed time: %.2fs' % (time() - t0)) result = arcfour(b'key', cipher) assert result == clear, '%r != %r' % (result, clear) print('elapsed time: %.2fs' % (time() - t0)) print('OK')
elapsed time: 0.50s elapsed time: 1.02s OK
def crypto_process(size, key): in_text = bytearray(randrange(256) for i in range(size)) cypher_text = arcfour(key, in_text) out_text = arcfour(key, cypher_text) assert in_text == out_text, 'Failed arcfour_test' return size def main(workers=None): JOBS = 12 SIZE = 2**18 KEY = b"'Twas brillig, and the slithy toves\nDid gyre" STATUS = '{} workers, elapsed time: {:.2f}s' if workers: workers = int(workers) t0 = time() with ProcessPoolExecutor(workers) as executor: actual_workers = executor._max_workers to_do = [] for i in range(JOBS, 0, -1): size = SIZE + int(SIZE / JOBS * (i - JOBS/2)) job = executor.submit(crypto_process, size, KEY) to_do.append(job) for future in as_completed(to_do): res = future.result() print('{:.1f} KB'.format(res/2**10)) print(STATUS.format(actual_workers, time() - t0))
main(1)
384.0 KB 362.7 KB 341.3 KB 320.0 KB 298.7 KB 277.3 KB 256.0 KB 234.7 KB 213.3 KB 192.0 KB 170.7 KB 149.3 KB 1 workers, elapsed time: 6.50s
main(2)
Process (36236) 开始... 父进程 (36236) 产生了子进程: (36401). 子进程: (36401) 它的父进程是: (36236).
0
Process (36236) 开始... 父进程 (36236) 产生了子进程: (36401). 子进程: (36401) 它的父进程是: (36236).
1
Process (36236) 开始... 父进程 (36236) 产生了子进程: (36401). 子进程: (36401) 它的父进程是: (36236).
2
MapReduce模型
这种模式可能更加被大家熟悉,同一个流程,将容器中的数据一条一脚放入子进程运算,最终也结果也会被放入容器中.最后可以将收集来的数据在主进程中进行处理
Process (36236) 开始... 父进程 (36236) 产生了子进程: (36401). 子进程: (36401) 它的父进程是: (36236).
3
Process (36236) 开始... 父进程 (36236) 产生了子进程: (36401). 子进程: (36401) 它的父进程是: (36236).
4
Process (36236) 开始... 父进程 (36236) 产生了子进程: (36401). 子进程: (36401) 它的父进程是: (36236).
5
Process (36236) 开始... 父进程 (36236) 产生了子进程: (36401). 子进程: (36401) 它的父进程是: (36236).
6
Process (36236) 开始... 父进程 (36236) 产生了子进程: (36401). 子进程: (36401) 它的父进程是: (36236).
7
Process (36236) 开始... 父进程 (36236) 产生了子进程: (36401). 子进程: (36401) 它的父进程是: (36236).
8
Process (36236) 开始... 父进程 (36236) 产生了子进程: (36401). 子进程: (36401) 它的父进程是: (36236).
9
Process (36236) 开始... 父进程 (36236) 产生了子进程: (36401). 子进程: (36401) 它的父进程是: (36236).
5
from concurrent.futures import ProcessPoolExecutor,as_completed from random import randrange from time import time
1
from concurrent.futures import ProcessPoolExecutor,as_completed from random import randrange from time import time
2
1.4. 使用进程池进行相对底层的多进程操作
进程池的方式很适合批量创建子进程.
对Pool对象调用join()
方法会等待所有子进程执行完毕,调用join()
之前必须先调用close()
,调用close()
之后就不能继续添加新的Process了.
请注意输出的结果,task 0,1,2,3是立刻执行的,而task 4要等待前面某个task完成后才执行,这是因为Pool的默认大小在我的电脑上是4,因此最多同时执行4个进程.这是Pool有意设计的限制,并不是操作系统的限制.如果改成p = Pool(5)
就可以同时跑5个进程.
由于Pool的默认大小是CPU的核数,如果你不幸拥有8核CPU,你要提交至少9个子进程才能看到上面的等待效果.
除了使用apply_async方法外,还有apply,map和map_async可以用于线程池的计算,编程模型也是如concurrent.futures
一样分为两类
- 并行任务模型
apply
单一任务布置apply_async
非阻塞单一任务布置
- MapReduce模型
map
同系统的map方法map_async
非阻塞的map
apply_async
from concurrent.futures import ProcessPoolExecutor,as_completed from random import randrange from time import time
3
from concurrent.futures import ProcessPoolExecutor,as_completed from random import randrange from time import time
4
from concurrent.futures import ProcessPoolExecutor,as_completed from random import randrange from time import time
5
map_async
from concurrent.futures import ProcessPoolExecutor,as_completed from random import randrange from time import time
6
from concurrent.futures import ProcessPoolExecutor,as_completed from random import randrange from time import time
7
获取进程池中的运算结果
from concurrent.futures import ProcessPoolExecutor,as_completed from random import randrange from time import time
8
from concurrent.futures import ProcessPoolExecutor,as_completed from random import randrange from time import time
9
1.5. 更底层的多进程编程
标准库中的multiprocessing模块就是跨平台版本的多进程模块.
multiprocessing模块提供了一个Process类来代表一个进程对象,下面的例子演示了启动一个子进程并等待其结束:
def arcfour(key, in_bytes, loops=20): """rc4算法""" kbox = bytearray(256) # create key box for i, car in enumerate(key): # copy key and vector kbox[i] = car j = len(key) for i in range(j, 256): # repeat until full kbox[i] = kbox[i-j] # [1] initialize sbox sbox = bytearray(range(256)) # repeat sbox mixing loop, as recommened in CipherSaber-2 # http://ciphersaber.gurus.com/faq.html#cs2 j = 0 for k in range(loops): for i in range(256): j = (j + sbox[i] + kbox[i]) % 256 sbox[i], sbox[j] = sbox[j], sbox[i] # main loop i = 0 j = 0 out_bytes = bytearray() for car in in_bytes: i = (i + 1) % 256 # [2] shuffle sbox j = (j + sbox[i]) % 256 sbox[i], sbox[j] = sbox[j], sbox[i] # [3] compute t t = (sbox[i] + sbox[j]) % 256 k = sbox[t] car = car ^ k out_bytes.append(car) return out_bytes
0
def arcfour(key, in_bytes, loops=20): """rc4算法""" kbox = bytearray(256) # create key box for i, car in enumerate(key): # copy key and vector kbox[i] = car j = len(key) for i in range(j, 256): # repeat until full kbox[i] = kbox[i-j] # [1] initialize sbox sbox = bytearray(range(256)) # repeat sbox mixing loop, as recommened in CipherSaber-2 # http://ciphersaber.gurus.com/faq.html#cs2 j = 0 for k in range(loops): for i in range(256): j = (j + sbox[i] + kbox[i]) % 256 sbox[i], sbox[j] = sbox[j], sbox[i] # main loop i = 0 j = 0 out_bytes = bytearray() for car in in_bytes: i = (i + 1) % 256 # [2] shuffle sbox j = (j + sbox[i]) % 256 sbox[i], sbox[j] = sbox[j], sbox[i] # [3] compute t t = (sbox[i] + sbox[j]) % 256 k = sbox[t] car = car ^ k out_bytes.append(car) return out_bytes
1
1.5.1. 使用Process作为父类自定义子进程
Process的子类需要重写run
方法.
def arcfour(key, in_bytes, loops=20): """rc4算法""" kbox = bytearray(256) # create key box for i, car in enumerate(key): # copy key and vector kbox[i] = car j = len(key) for i in range(j, 256): # repeat until full kbox[i] = kbox[i-j] # [1] initialize sbox sbox = bytearray(range(256)) # repeat sbox mixing loop, as recommened in CipherSaber-2 # http://ciphersaber.gurus.com/faq.html#cs2 j = 0 for k in range(loops): for i in range(256): j = (j + sbox[i] + kbox[i]) % 256 sbox[i], sbox[j] = sbox[j], sbox[i] # main loop i = 0 j = 0 out_bytes = bytearray() for car in in_bytes: i = (i + 1) % 256 # [2] shuffle sbox j = (j + sbox[i]) % 256 sbox[i], sbox[j] = sbox[j], sbox[i] # [3] compute t t = (sbox[i] + sbox[j]) % 256 k = sbox[t] car = car ^ k out_bytes.append(car) return out_bytes
2
def arcfour(key, in_bytes, loops=20): """rc4算法""" kbox = bytearray(256) # create key box for i, car in enumerate(key): # copy key and vector kbox[i] = car j = len(key) for i in range(j, 256): # repeat until full kbox[i] = kbox[i-j] # [1] initialize sbox sbox = bytearray(range(256)) # repeat sbox mixing loop, as recommened in CipherSaber-2 # http://ciphersaber.gurus.com/faq.html#cs2 j = 0 for k in range(loops): for i in range(256): j = (j + sbox[i] + kbox[i]) % 256 sbox[i], sbox[j] = sbox[j], sbox[i] # main loop i = 0 j = 0 out_bytes = bytearray() for car in in_bytes: i = (i + 1) % 256 # [2] shuffle sbox j = (j + sbox[i]) % 256 sbox[i], sbox[j] = sbox[j], sbox[i] # [3] compute t t = (sbox[i] + sbox[j]) % 256 k = sbox[t] car = car ^ k out_bytes.append(car) return out_bytes
3
def arcfour(key, in_bytes, loops=20): """rc4算法""" kbox = bytearray(256) # create key box for i, car in enumerate(key): # copy key and vector kbox[i] = car j = len(key) for i in range(j, 256): # repeat until full kbox[i] = kbox[i-j] # [1] initialize sbox sbox = bytearray(range(256)) # repeat sbox mixing loop, as recommened in CipherSaber-2 # http://ciphersaber.gurus.com/faq.html#cs2 j = 0 for k in range(loops): for i in range(256): j = (j + sbox[i] + kbox[i]) % 256 sbox[i], sbox[j] = sbox[j], sbox[i] # main loop i = 0 j = 0 out_bytes = bytearray() for car in in_bytes: i = (i + 1) % 256 # [2] shuffle sbox j = (j + sbox[i]) % 256 sbox[i], sbox[j] = sbox[j], sbox[i] # [3] compute t t = (sbox[i] + sbox[j]) % 256 k = sbox[t] car = car ^ k out_bytes.append(car) return out_bytes
4
创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,用start()
方法启动,这样创建进程比fork()
简单.
join()
方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步.
可以看到我们的父进程进行完了子进程才进行.其实当执行start方法的时候我们就已经把进程创建好并给他任务了.虽然进程启动了,但我们并不能知道它啥时候运算完成.这时候用join方法来确认是否执行完了(通过阻塞主进程),也就是起个等待结果的作用.
需要注意这个类中run
方法其实并不在定义和调用它的进程中,因此在调用它的进程看来,其中定义的变量是不变的.要想在调用它的进程中获取到子进程执行的内容,需要了解如何做进程间通信
def arcfour(key, in_bytes, loops=20): """rc4算法""" kbox = bytearray(256) # create key box for i, car in enumerate(key): # copy key and vector kbox[i] = car j = len(key) for i in range(j, 256): # repeat until full kbox[i] = kbox[i-j] # [1] initialize sbox sbox = bytearray(range(256)) # repeat sbox mixing loop, as recommened in CipherSaber-2 # http://ciphersaber.gurus.com/faq.html#cs2 j = 0 for k in range(loops): for i in range(256): j = (j + sbox[i] + kbox[i]) % 256 sbox[i], sbox[j] = sbox[j], sbox[i] # main loop i = 0 j = 0 out_bytes = bytearray() for car in in_bytes: i = (i + 1) % 256 # [2] shuffle sbox j = (j + sbox[i]) % 256 sbox[i], sbox[j] = sbox[j], sbox[i] # [3] compute t t = (sbox[i] + sbox[j]) % 256 k = sbox[t] car = car ^ k out_bytes.append(car) return out_bytes
5
def arcfour(key, in_bytes, loops=20): """rc4算法""" kbox = bytearray(256) # create key box for i, car in enumerate(key): # copy key and vector kbox[i] = car j = len(key) for i in range(j, 256): # repeat until full kbox[i] = kbox[i-j] # [1] initialize sbox sbox = bytearray(range(256)) # repeat sbox mixing loop, as recommened in CipherSaber-2 # http://ciphersaber.gurus.com/faq.html#cs2 j = 0 for k in range(loops): for i in range(256): j = (j + sbox[i] + kbox[i]) % 256 sbox[i], sbox[j] = sbox[j], sbox[i] # main loop i = 0 j = 0 out_bytes = bytearray() for car in in_bytes: i = (i + 1) % 256 # [2] shuffle sbox j = (j + sbox[i]) % 256 sbox[i], sbox[j] = sbox[j], sbox[i] # [3] compute t t = (sbox[i] + sbox[j]) % 256 k = sbox[t] car = car ^ k out_bytes.append(car) return out_bytes
6
def arcfour(key, in_bytes, loops=20): """rc4算法""" kbox = bytearray(256) # create key box for i, car in enumerate(key): # copy key and vector kbox[i] = car j = len(key) for i in range(j, 256): # repeat until full kbox[i] = kbox[i-j] # [1] initialize sbox sbox = bytearray(range(256)) # repeat sbox mixing loop, as recommened in CipherSaber-2 # http://ciphersaber.gurus.com/faq.html#cs2 j = 0 for k in range(loops): for i in range(256): j = (j + sbox[i] + kbox[i]) % 256 sbox[i], sbox[j] = sbox[j], sbox[i] # main loop i = 0 j = 0 out_bytes = bytearray() for car in in_bytes: i = (i + 1) % 256 # [2] shuffle sbox j = (j + sbox[i]) % 256 sbox[i], sbox[j] = sbox[j], sbox[i] # [3] compute t t = (sbox[i] + sbox[j]) % 256 k = sbox[t] car = car ^ k out_bytes.append(car) return out_bytes
7
1.6. 进程间通信
如何让进程间通信呢,其实原理上来讲就是构造一个独立的数据结构来存放结果来参与通信
有两种方式,最常用的一种是用队列
1.6.1. 先进先出队列Queue
def arcfour(key, in_bytes, loops=20): """rc4算法""" kbox = bytearray(256) # create key box for i, car in enumerate(key): # copy key and vector kbox[i] = car j = len(key) for i in range(j, 256): # repeat until full kbox[i] = kbox[i-j] # [1] initialize sbox sbox = bytearray(range(256)) # repeat sbox mixing loop, as recommened in CipherSaber-2 # http://ciphersaber.gurus.com/faq.html#cs2 j = 0 for k in range(loops): for i in range(256): j = (j + sbox[i] + kbox[i]) % 256 sbox[i], sbox[j] = sbox[j], sbox[i] # main loop i = 0 j = 0 out_bytes = bytearray() for car in in_bytes: i = (i + 1) % 256 # [2] shuffle sbox j = (j + sbox[i]) % 256 sbox[i], sbox[j] = sbox[j], sbox[i] # [3] compute t t = (sbox[i] + sbox[j]) % 256 k = sbox[t] car = car ^ k out_bytes.append(car) return out_bytes
8
def arcfour(key, in_bytes, loops=20): """rc4算法""" kbox = bytearray(256) # create key box for i, car in enumerate(key): # copy key and vector kbox[i] = car j = len(key) for i in range(j, 256): # repeat until full kbox[i] = kbox[i-j] # [1] initialize sbox sbox = bytearray(range(256)) # repeat sbox mixing loop, as recommened in CipherSaber-2 # http://ciphersaber.gurus.com/faq.html#cs2 j = 0 for k in range(loops): for i in range(256): j = (j + sbox[i] + kbox[i]) % 256 sbox[i], sbox[j] = sbox[j], sbox[i] # main loop i = 0 j = 0 out_bytes = bytearray() for car in in_bytes: i = (i + 1) % 256 # [2] shuffle sbox j = (j + sbox[i]) % 256 sbox[i], sbox[j] = sbox[j], sbox[i] # [3] compute t t = (sbox[i] + sbox[j]) % 256 k = sbox[t] car = car ^ k out_bytes.append(car) return out_bytes
9
个稍微复杂一点的例子:
clear = bytearray(b'1234567890' * 100000) t0 = time() cipher = arcfour(b'key', clear) print('elapsed time: %.2fs' % (time() - t0)) result = arcfour(b'key', cipher) assert result == clear, '%r != %r' % (result, clear) print('elapsed time: %.2fs' % (time() - t0)) print('OK')
0
clear = bytearray(b'1234567890' * 100000) t0 = time() cipher = arcfour(b'key', clear) print('elapsed time: %.2fs' % (time() - t0)) result = arcfour(b'key', cipher) assert result == clear, '%r != %r' % (result, clear) print('elapsed time: %.2fs' % (time() - t0)) print('OK')
1
两个进程间,父进程创建一个队列给各个子进程,子进程接收父进程的队列作为参数运行.运行过程中将结果存入队列最后运行完后将”done!”存入队列,由父进程接收.
1.6.2. 生产者消费者模型
队列最常见的用处就是在生产者消费者模式中作为数据缓冲区.以下就是一个生产者消费者模式的例子
clear = bytearray(b'1234567890' * 100000) t0 = time() cipher = arcfour(b'key', clear) print('elapsed time: %.2fs' % (time() - t0)) result = arcfour(b'key', cipher) assert result == clear, '%r != %r' % (result, clear) print('elapsed time: %.2fs' % (time() - t0)) print('OK')
2
clear = bytearray(b'1234567890' * 100000) t0 = time() cipher = arcfour(b'key', clear) print('elapsed time: %.2fs' % (time() - t0)) result = arcfour(b'key', cipher) assert result == clear, '%r != %r' % (result, clear) print('elapsed time: %.2fs' % (time() - t0)) print('OK')
3
clear = bytearray(b'1234567890' * 100000) t0 = time() cipher = arcfour(b'key', clear) print('elapsed time: %.2fs' % (time() - t0)) result = arcfour(b'key', cipher) assert result == clear, '%r != %r' % (result, clear) print('elapsed time: %.2fs' % (time() - t0)) print('OK')
4
clear = bytearray(b'1234567890' * 100000) t0 = time() cipher = arcfour(b'key', clear) print('elapsed time: %.2fs' % (time() - t0)) result = arcfour(b'key', cipher) assert result == clear, '%r != %r' % (result, clear) print('elapsed time: %.2fs' % (time() - t0)) print('OK')
5
clear = bytearray(b'1234567890' * 100000) t0 = time() cipher = arcfour(b'key', clear) print('elapsed time: %.2fs' % (time() - t0)) result = arcfour(b'key', cipher) assert result == clear, '%r != %r' % (result, clear) print('elapsed time: %.2fs' % (time() - t0)) print('OK')
6
1.6.3. 管道Pipes
既然是管道,那就肯定有两端,有方向,分成单向管道和双向管道了.
看一个最简单的双向管道
clear = bytearray(b'1234567890' * 100000) t0 = time() cipher = arcfour(b'key', clear) print('elapsed time: %.2fs' % (time() - t0)) result = arcfour(b'key', cipher) assert result == clear, '%r != %r' % (result, clear) print('elapsed time: %.2fs' % (time() - t0)) print('OK')
7
def arcfour(key, in_bytes, loops=20): """rc4算法""" kbox = bytearray(256) # create key box for i, car in enumerate(key): # copy key and vector kbox[i] = car j = len(key) for i in range(j, 256): # repeat until full kbox[i] = kbox[i-j] # [1] initialize sbox sbox = bytearray(range(256)) # repeat sbox mixing loop, as recommened in CipherSaber-2 # http://ciphersaber.gurus.com/faq.html#cs2 j = 0 for k in range(loops): for i in range(256): j = (j + sbox[i] + kbox[i]) % 256 sbox[i], sbox[j] = sbox[j], sbox[i] # main loop i = 0 j = 0 out_bytes = bytearray() for car in in_bytes: i = (i + 1) % 256 # [2] shuffle sbox j = (j + sbox[i]) % 256 sbox[i], sbox[j] = sbox[j], sbox[i] # [3] compute t t = (sbox[i] + sbox[j]) % 256 k = sbox[t] car = car ^ k out_bytes.append(car) return out_bytes
9
稍微复杂的例子:
clear = bytearray(b'1234567890' * 100000) t0 = time() cipher = arcfour(b'key', clear) print('elapsed time: %.2fs' % (time() - t0)) result = arcfour(b'key', cipher) assert result == clear, '%r != %r' % (result, clear) print('elapsed time: %.2fs' % (time() - t0)) print('OK')
9
elapsed time: 0.50s elapsed time: 1.02s OK
0
可以看出管道的限制相对多些,必须要建立连接才能交换数据,一出一进这样子,这也是为啥队列用的比较多.
1.7. 静态数据共享
python里面的全局变量也只管到他自己的进程,如果要让一个静态的数据在每个子进程中都可以调用.那么需要用到模块中的几个方法:
Value, Array
静态数据位共享,静态数组共享,本质就是在内存中开辟一块用于共享的空间,Value和Array都必须使用C类型保存数据,具体可以使用:
- 标准库中的
struct
模块定义的字符串符号.如"d"
- 使用标准库
ctypes
中定义的c语言类型,如c_double
- 继承标准库
ctypes
其中的Structure
对象创建的自定义结构体, c_double
- 标准库中的
elapsed time: 0.50s elapsed time: 1.02s OK
1
elapsed time: 0.50s elapsed time: 1.02s OK
2
为了更加方便的在主进程获取到子进程中的数据,我们可以,下例就是一个例子.
elapsed time: 0.50s elapsed time: 1.02s OK
3
elapsed time: 0.50s elapsed time: 1.02s OK
4
1.7.1. 高级共享multiprocessing.Manager
之前介绍了queue,pipe,array和value,这些都太具体底层,有没有什么方法可以像处理python容器一样简单地处理数据共享的问题呢?multiprocess提供一个manager模块.
Manager()返回的manager对象控制了一个server进程,此进程包含的python对象可以被其他的进程通过proxies来访问.从而达到多进程间数据通信且安全。
Manager支持的类型有
- list
- dict
- Namespace
- Lock
- RLock
- Semaphore
- BoundedSemaphore
- Condition
- Event
- Queue
- Value
- Array。
elapsed time: 0.50s elapsed time: 1.02s OK
5
elapsed time: 0.50s elapsed time: 1.02s OK
6
namespace对象没有公共的方法,但是有可写的属性
elapsed time: 0.50s elapsed time: 1.02s OK
7
elapsed time: 0.50s elapsed time: 1.02s OK
8
1.8. 使用Unix信号管理子进程
类似在多线程中控制子线程,我们使用os.kill(pid,sig)
来控制子进程.
elapsed time: 0.50s elapsed time: 1.02s OK
9
def crypto_process(size, key): in_text = bytearray(randrange(256) for i in range(size)) cypher_text = arcfour(key, in_text) out_text = arcfour(key, cypher_text) assert in_text == out_text, 'Failed arcfour_test' return size def main(workers=None): JOBS = 12 SIZE = 2**18 KEY = b"'Twas brillig, and the slithy toves\nDid gyre" STATUS = '{} workers, elapsed time: {:.2f}s' if workers: workers = int(workers) t0 = time() with ProcessPoolExecutor(workers) as executor: actual_workers = executor._max_workers to_do = [] for i in range(JOBS, 0, -1): size = SIZE + int(SIZE / JOBS * (i - JOBS/2)) job = executor.submit(crypto_process, size, KEY) to_do.append(job) for future in as_completed(to_do): res = future.result() print('{:.1f} KB'.format(res/2**10)) print(STATUS.format(actual_workers, time() - t0))
0
还没有评论,来说两句吧...