5.协程与进程
大约 5 分钟学习笔记测试开发测试工具开发
一、 协程(Coroutine)技术
1. 基本用法
线程 :又叫 微线程 、 纤程
协程是一种用户级的轻量级线程 。
协程 拥有自己的寄存器上下文和栈。
协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来时,恢复先前保存的寄存器和栈。
优点:
- 协程极高的执行效率。 - 因为子程序切换不是线程切换,而是由程序自身控制,因此没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。
 
- 不需要多线程的锁机制- 只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就可以,所以执行效率比多线程高很多。
 
缺点:
- 无法利用多核资源- 协程的本质是个单线程,他不能同时将 单个 CPU 的多个核用上,协程需要和进程配合才能运行在多 CPU 上。(适用于 CPU 密集型 应用)
 
- 进行阻塞(Blocking) 操作(如 IO 时)会阻塞掉整个程序
2. gevent
gevent :一个现在很火、支持也会全面的 Python 第三方协程库。
由于切换是在 IO 操作时自动完成,所以 gevent 需要修改 Python 自带的一些标准库,这一过程在启动时通过 monkey patch 完成。
import time
import gevent
# 改变 Python标准库,将协程变更为 非阻塞
from gevent import monkey
monkey.patch_all()
def f(num):
    for i in range(2):
        print(f"f{num}-正在运行 >>> ", i)
        time.sleep(2)
if __name__ == '__main__':
    a = time.time()
    # 创建协程
    g1 = gevent.spawn(f, 1)
    g2 = gevent.spawn(f, 2)
    # 阻塞
    gevent.joinall([g1, g2])
    b = time.time()
    print("耗时:", b - a)3.端口扫描--协程 函数封装
def geventScan(address, startPort, endPort):
    """
    协程函数封装
    :param address:ip地址
    :param startPort: 端口开始值
    :param endPort: 端口结束值
    :return:
    """
    returnList = []
    geventList = []  # 存放线程对象
    g = gevent.pool.Pool(500)   # 协程池
    for one in range(startPort, endPort + 1):
        # 创建协程对象
        geventList.append(g.spawn(portScan, address, one, returnList))
        # 阻塞
    gevent.joinall(geventList)二、进程(multiprocessing 库)
一个程序运行起来之后,代码 加 用到的资源 称之为 进程 。
1.进程之间的资源是相互独立的
import multiprocessing
num = 0
def test1():
	global num
	for i in range(10):
		num += 1
def test2():
	global num
	num += 22
if __name__ == '__main__':
	p1 = multiprocessing.Process(target=test1)
	p2 = multiprocessing.Process(target=test2)
	p1.start()
	p2.start()
    p1.join()    #join()方法可以等子进程结束后再继续往下运行,通常用于进程间的同步
	p2.join()
	print(num)运行结果:
02.多进程并发
import multiprocessing
import time
def test1():
	for i in range(10):
        time.sleep(1)
        print('test1',i)
def test2():
    for i in range(10):
        time.sleep(1)
        print('test2',i)
if __name__ == '__main__':
    p1 = multiprocessing.Process(target=test1)
    p2 = multiprocessing.Process(target=test2)
    p1.start()
    p2.start()运行结果:
test1 0
test2 0
test1 1
test2 1
test1 2
test2 2
test2 3
test1 3
test2 4
test1 4
test2 5
test1 5
test2 6
test1 6
test2 7
test1 7
test1 8
test2 8
test2 9
test1 93. 进程池并发
如果要启动大量的子进程,可以用进程池的方式批量创建子进程
import time
from multiprocessing import Pool #Pool中的P要大写的
def test1():
    time.sleep(1)
    for i in range(10):
        print('test1',i)
def test2():
    time.sleep(1)
    for i in range(10):
        print('test2',i)
if __name__ == '__main__':
    pool = Pool(5)    #Pool的默认大小是CPU的核数,进程数可以手动更改
    pool.apply_async(test1)
    pool.apply_async(test2)
    pool.close()
    pool.join()    #调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的process了。执行结果:
test1 0
test1 1
test1 2
test1 3
test1 4
test1 5
test1 6
test1 7
test1 8
test1 9
test2 0
test2 1
test2 2
test2 3
test2 4
test2 5
test2 6
test2 7
test2 8
test2 94. 进程间通信
Process之间肯定是需要通信的,操作系统提供了很多机制来实现进程间的通信。Python 的multiprocessing模块包装了底层的机制,提供了Queue、Pipes等多种方式来交换数据。
我们以Queue为例,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据:
from multiprocessing import Process, Queue
import os, time, random
# 写数据进程执行的代码:
def write(q):
    print('Process to write: %s' % os.getpid())
    for value in ['A','B','C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())
# 读数据进程执行的代码:
def read(q):
    print('Process to read: %s' % os.getpid())
    while True:
        value = q.get(True)
        print('Get %s from queue.' % value)
if __name__=='__main__':
    # 父进程创建Queue,并传给各个子进程:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程pw,写入:
 	pw.start()
    # 启动子进程pr,读取:
 	pr.start()
    # 等待pw结束:
 	pw.join()
    # pr进程里是死循环,无法等待其结束,只能强行终止:
 	pr.terminate()运行结果:
Process to write: 2024
Put A to queue...
Process to read: 4308
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.5. 端口扫描 -- 进程 函数封装
def processScan(address, startPort=0, endPort=65535):
    writeDate(address)
    st = time.time()
    processList = []
    cpuCount = cpu_count()  # 获取CPU核数
    ports = []
    # 参数封装
    if endPort // cpuCount >= 1:
        for i in range(cpuCount):
            if i != cpuCount - 1:
                num = endPort // cpuCount
                newPort = (startPort, startPort + num)
                ports.append(newPort)
                startPort += num
            else:
                ports.append((endPort // cpuCount * (cpuCount - 1), endPort))
    else:
        ports.append((startPort, endPort))
    for i in ports:
        p = Process(target=geventScan, args=(address, i[0], i[1]))
        p.start()
        processList.append(p)
    for i in processList:
        i.join()
    et = time.time()
    print(f'扫描过程,总计耗时:{(et - st):.2f} s。')
    writeDate(f"{(et - st):.2f}")