协程与进程

一、 协程(Coroutine)技术

1. 基本用法

线程 :又叫 微线程纤程

协程是一种用户级的轻量级线程

协程 拥有自己的寄存器上下文和栈。

协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来时,恢复先前保存的寄存器和栈。

优点:

  • 协程极高的执行效率
    • 因为子程序切换不是线程切换,而是由程序自身控制,因此没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。
  • 不需要多线程的锁机制
    • 只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就可以,所以执行效率比多线程高很多。

缺点:

  • 无法利用多核资源
    • 协程的本质是个单线程,他不能同时将 单个 CPU 的多个核用上,协程需要和进程配合才能运行在多 CPU上。(适用于 CPU 密集型 应用)
  • 进行阻塞(Blocking) 操作(如 IO 时)会阻塞掉整个程序

2. gevent

gevent :一个现在很火、支持也会全面的 Python 第三方协程库。

由于切换是在 IO 操作时自动完成,所以 gevent 需要修改 Python 自带的一些标准库,这一过程在启动时通过 monkey patch 完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import time
import gevent

# 改变 Python标准库,将协程变更为 非阻塞
from gevent import monkey
monkey.patch_all()


def f(num):
for i in range(2):
print(f"f{num}-正在运行 >>> ", i)
time.sleep(2)


if __name__ == '__main__':
a = time.time()

# 创建协程
g1 = gevent.spawn(f, 1)
g2 = gevent.spawn(f, 2)

# 阻塞
gevent.joinall([g1, g2])

b = time.time()
print("耗时:", b - a)

3.端口扫描—协程 函数封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def geventScan(address, startPort, endPort):
"""
协程函数封装
:param address:ip地址
:param startPort: 端口开始值
:param endPort: 端口结束值
:return:
"""
returnList = []
geventList = [] # 存放线程对象
g = gevent.pool.Pool(500) # 协程池

for one in range(startPort, endPort + 1):
# 创建协程对象
geventList.append(g.spawn(portScan, address, one, returnList))

# 阻塞
gevent.joinall(geventList)

二、进程(multiprocessing 库)

一个程序运行起来之后,代码 加 用到的资源 称之为 进程 。

1.进程之间的资源是相互独立的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import multiprocessing
num = 0
def test1():
global num
for i in range(10):
num += 1
def test2():
global num
num += 22

if __name__ == '__main__':
p1 = multiprocessing.Process(target=test1)
p2 = multiprocessing.Process(target=test2)

p1.start()
p2.start()

p1.join() #join()方法可以等子进程结束后再继续往下运行,通常用于进程间的同步
p2.join()

print(num)

运行结果:

1
0

2.多进程并发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import multiprocessing
import time

def test1():
for i in range(10):
time.sleep(1)
print('test1',i)

def test2():
for i in range(10):
time.sleep(1)
print('test2',i)

if __name__ == '__main__':
p1 = multiprocessing.Process(target=test1)
p2 = multiprocessing.Process(target=test2)

p1.start()
p2.start()

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
test1 0
test2 0
test1 1
test2 1
test1 2
test2 2
test2 3
test1 3
test2 4
test1 4
test2 5
test1 5
test2 6
test1 6
test2 7
test1 7
test1 8
test2 8
test2 9
test1 9

3. 进程池并发

如果要启动大量的子进程,可以用进程池的方式批量创建子进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import time
from multiprocessing import Pool #Pool中的P要大写的
def test1():
time.sleep(1)
for i in range(10):
print('test1',i)
def test2():
time.sleep(1)
for i in range(10):
print('test2',i)

if __name__ == '__main__':
pool = Pool(5) #Pool的默认大小是CPU的核数,进程数可以手动更改
pool.apply_async(test1)
pool.apply_async(test2)

pool.close()
pool.join() #调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的process了。

执行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
test1 0
test1 1
test1 2
test1 3
test1 4
test1 5
test1 6
test1 7
test1 8
test1 9
test2 0
test2 1
test2 2
test2 3
test2 4
test2 5
test2 6
test2 7
test2 8
test2 9

4. 进程间通信

Process之间肯定是需要通信的,操作系统提供了很多机制来实现进程间的通信。Python的multiprocessing模块包装了底层的机制,提供了QueuePipes等多种方式来交换数据。

我们以Queue为例,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
print('Process to write: %s' % os.getpid())
for value in ['A','B','C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
print('Process to read: %s' % os.getpid())
while True:
value = q.get(True)
print('Get %s from queue.' % value)

if __name__=='__main__':
# 父进程创建Queue,并传给各个子进程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 启动子进程pw,写入:
pw.start()

# 启动子进程pr,读取:
pr.start()

# 等待pw结束:
pw.join()

# pr进程里是死循环,无法等待其结束,只能强行终止:
pr.terminate()

运行结果:

1
2
3
4
5
6
7
8
Process to write: 2024
Put A to queue...
Process to read: 4308
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.

5. 端口扫描 — 进程 函数封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def processScan(address, startPort=0, endPort=65535):
writeDate(address)
st = time.time()
processList = []
cpuCount = cpu_count() # 获取CPU核数
ports = []

# 参数封装
if endPort // cpuCount >= 1:
for i in range(cpuCount):
if i != cpuCount - 1:
num = endPort // cpuCount
newPort = (startPort, startPort + num)
ports.append(newPort)
startPort += num
else:
ports.append((endPort // cpuCount * (cpuCount - 1), endPort))
else:
ports.append((startPort, endPort))


for i in ports:
p = Process(target=geventScan, args=(address, i[0], i[1]))
p.start()
processList.append(p)

for i in processList:
i.join()
et = time.time()
print(f'扫描过程,总计耗时:{(et - st):.2f} s。')
writeDate(f"{(et - st):.2f}")