5.协程与进程
大约 5 分钟学习笔记测试开发测试工具开发
一、 协程(Coroutine)技术
1. 基本用法
线程
:又叫 微线程 、 纤程
协程是一种用户级的轻量级线程 。
协程 拥有自己的寄存器上下文和栈。
协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来时,恢复先前保存的寄存器和栈。
优点:
- 协程极高的执行效率。
- 因为子程序切换不是线程切换,而是由程序自身控制,因此没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。
- 不需要多线程的锁机制
- 只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就可以,所以执行效率比多线程高很多。
缺点:
- 无法利用多核资源
- 协程的本质是个单线程,他不能同时将 单个 CPU 的多个核用上,协程需要和进程配合才能运行在多 CPU 上。(适用于 CPU 密集型 应用)
- 进行阻塞(Blocking) 操作(如 IO 时)会阻塞掉整个程序
2. gevent
gevent
:一个现在很火、支持也会全面的 Python 第三方协程库。
由于切换是在 IO
操作时自动完成,所以 gevent
需要修改 Python 自带的一些标准库,这一过程在启动时通过 monkey patch
完成。
import time
import gevent
# 改变 Python标准库,将协程变更为 非阻塞
from gevent import monkey
monkey.patch_all()
def f(num):
for i in range(2):
print(f"f{num}-正在运行 >>> ", i)
time.sleep(2)
if __name__ == '__main__':
a = time.time()
# 创建协程
g1 = gevent.spawn(f, 1)
g2 = gevent.spawn(f, 2)
# 阻塞
gevent.joinall([g1, g2])
b = time.time()
print("耗时:", b - a)
3.端口扫描--协程 函数封装
def geventScan(address, startPort, endPort):
"""
协程函数封装
:param address:ip地址
:param startPort: 端口开始值
:param endPort: 端口结束值
:return:
"""
returnList = []
geventList = [] # 存放线程对象
g = gevent.pool.Pool(500) # 协程池
for one in range(startPort, endPort + 1):
# 创建协程对象
geventList.append(g.spawn(portScan, address, one, returnList))
# 阻塞
gevent.joinall(geventList)
二、进程(multiprocessing 库)
一个程序运行起来之后,代码 加 用到的资源 称之为 进程 。
1.进程之间的资源是相互独立的
import multiprocessing
num = 0
def test1():
global num
for i in range(10):
num += 1
def test2():
global num
num += 22
if __name__ == '__main__':
p1 = multiprocessing.Process(target=test1)
p2 = multiprocessing.Process(target=test2)
p1.start()
p2.start()
p1.join() #join()方法可以等子进程结束后再继续往下运行,通常用于进程间的同步
p2.join()
print(num)
运行结果:
0
2.多进程并发
import multiprocessing
import time
def test1():
for i in range(10):
time.sleep(1)
print('test1',i)
def test2():
for i in range(10):
time.sleep(1)
print('test2',i)
if __name__ == '__main__':
p1 = multiprocessing.Process(target=test1)
p2 = multiprocessing.Process(target=test2)
p1.start()
p2.start()
运行结果:
test1 0
test2 0
test1 1
test2 1
test1 2
test2 2
test2 3
test1 3
test2 4
test1 4
test2 5
test1 5
test2 6
test1 6
test2 7
test1 7
test1 8
test2 8
test2 9
test1 9
3. 进程池并发
如果要启动大量的子进程,可以用进程池的方式批量创建子进程
import time
from multiprocessing import Pool #Pool中的P要大写的
def test1():
time.sleep(1)
for i in range(10):
print('test1',i)
def test2():
time.sleep(1)
for i in range(10):
print('test2',i)
if __name__ == '__main__':
pool = Pool(5) #Pool的默认大小是CPU的核数,进程数可以手动更改
pool.apply_async(test1)
pool.apply_async(test2)
pool.close()
pool.join() #调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的process了。
执行结果:
test1 0
test1 1
test1 2
test1 3
test1 4
test1 5
test1 6
test1 7
test1 8
test1 9
test2 0
test2 1
test2 2
test2 3
test2 4
test2 5
test2 6
test2 7
test2 8
test2 9
4. 进程间通信
Process
之间肯定是需要通信的,操作系统提供了很多机制来实现进程间的通信。Python 的multiprocessing
模块包装了底层的机制,提供了Queue
、Pipes
等多种方式来交换数据。
我们以Queue
为例,在父进程中创建两个子进程,一个往Queue
里写数据,一个从Queue
里读数据:
from multiprocessing import Process, Queue
import os, time, random
# 写数据进程执行的代码:
def write(q):
print('Process to write: %s' % os.getpid())
for value in ['A','B','C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())
# 读数据进程执行的代码:
def read(q):
print('Process to read: %s' % os.getpid())
while True:
value = q.get(True)
print('Get %s from queue.' % value)
if __name__=='__main__':
# 父进程创建Queue,并传给各个子进程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 启动子进程pw,写入:
pw.start()
# 启动子进程pr,读取:
pr.start()
# 等待pw结束:
pw.join()
# pr进程里是死循环,无法等待其结束,只能强行终止:
pr.terminate()
运行结果:
Process to write: 2024
Put A to queue...
Process to read: 4308
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.
5. 端口扫描 -- 进程 函数封装
def processScan(address, startPort=0, endPort=65535):
writeDate(address)
st = time.time()
processList = []
cpuCount = cpu_count() # 获取CPU核数
ports = []
# 参数封装
if endPort // cpuCount >= 1:
for i in range(cpuCount):
if i != cpuCount - 1:
num = endPort // cpuCount
newPort = (startPort, startPort + num)
ports.append(newPort)
startPort += num
else:
ports.append((endPort // cpuCount * (cpuCount - 1), endPort))
else:
ports.append((startPort, endPort))
for i in ports:
p = Process(target=geventScan, args=(address, i[0], i[1]))
p.start()
processList.append(p)
for i in processList:
i.join()
et = time.time()
print(f'扫描过程,总计耗时:{(et - st):.2f} s。')
writeDate(f"{(et - st):.2f}")