一、 协程(Coroutine)技术 1. 基本用法 线程
:又叫 微线程 、 纤程
协程是一种用户级的轻量级线程 。
协程 拥有自己的寄存器上下文和栈。
协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来时,恢复先前保存的寄存器和栈。
优点: 协程极高的执行效率 。因为子程序切换不是线程切换,而是由程序自身控制,因此没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。 不需要多线程的锁机制 只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就可以,所以执行效率比多线程高很多。 缺点: 无法利用多核资源 协程的本质是个单线程,他不能同时将 单个 CPU 的多个核用上,协程需要和进程配合才能运行在多 CPU 上。(适用于 CPU 密集型 应用) 进行阻塞(Blocking) 操作(如 IO 时)会阻塞掉整个程序 2. gevent gevent
:一个现在很火、支持也会全面的 Python 第三方协程库。
由于切换是在 IO
操作时自动完成,所以 gevent
需要修改 Python 自带的一些标准库,这一过程在启动时通过 monkey patch
完成。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import timeimport geventfrom gevent import monkeymonkey.patch_all() def f (num ): for i in range (2 ): print (f"f{num} -正在运行 >>> " , i) time.sleep(2 ) if __name__ == '__main__' : a = time.time() g1 = gevent.spawn(f, 1 ) g2 = gevent.spawn(f, 2 ) gevent.joinall([g1, g2]) b = time.time() print ("耗时:" , b - a)
3.端口扫描—协程 函数封装 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 def geventScan (address, startPort, endPort ): """ 协程函数封装 :param address:ip地址 :param startPort: 端口开始值 :param endPort: 端口结束值 :return: """ returnList = [] geventList = [] g = gevent.pool.Pool(500 ) for one in range (startPort, endPort + 1 ): geventList.append(g.spawn(portScan, address, one, returnList)) gevent.joinall(geventList)
二、进程(multiprocessing 库) 一个程序运行起来之后,代码 加 用到的资源 称之为 进程 。
1.进程之间的资源是相互独立的 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import multiprocessingnum = 0 def test1 (): global num for i in range (10 ): num += 1 def test2 (): global num num += 22 if __name__ == '__main__' : p1 = multiprocessing.Process(target=test1) p2 = multiprocessing.Process(target=test2) p1.start() p2.start() p1.join() p2.join() print (num)
运行结果:
2.多进程并发 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import multiprocessingimport timedef test1 (): for i in range (10 ): time.sleep(1 ) print ('test1' ,i) def test2 (): for i in range (10 ): time.sleep(1 ) print ('test2' ,i) if __name__ == '__main__' : p1 = multiprocessing.Process(target=test1) p2 = multiprocessing.Process(target=test2) p1.start() p2.start()
运行结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 test1 0 test2 0 test1 1 test2 1 test1 2 test2 2 test2 3 test1 3 test2 4 test1 4 test2 5 test1 5 test2 6 test1 6 test2 7 test1 7 test1 8 test2 8 test2 9 test1 9
3. 进程池并发 如果要启动大量的子进程,可以用进程池的方式批量创建子进程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import timefrom multiprocessing import Pool def test1 (): time.sleep(1 ) for i in range (10 ): print ('test1' ,i) def test2 (): time.sleep(1 ) for i in range (10 ): print ('test2' ,i) if __name__ == '__main__' : pool = Pool(5 ) pool.apply_async(test1) pool.apply_async(test2) pool.close() pool.join()
执行结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 test1 0 test1 1 test1 2 test1 3 test1 4 test1 5 test1 6 test1 7 test1 8 test1 9 test2 0 test2 1 test2 2 test2 3 test2 4 test2 5 test2 6 test2 7 test2 8 test2 9
4. 进程间通信 Process
之间肯定是需要通信的,操作系统提供了很多机制来实现进程间的通信。Python 的multiprocessing
模块包装了底层的机制,提供了Queue
、Pipes
等多种方式来交换数据。
我们以Queue
为例,在父进程中创建两个子进程,一个往Queue
里写数据,一个从Queue
里读数据:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 from multiprocessing import Process, Queueimport os, time, randomdef write (q ): print ('Process to write: %s' % os.getpid()) for value in ['A' ,'B' ,'C' ]: print ('Put %s to queue...' % value) q.put(value) time.sleep(random.random()) def read (q ): print ('Process to read: %s' % os.getpid()) while True : value = q.get(True ) print ('Get %s from queue.' % value) if __name__=='__main__' : q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) pw.start() pr.start() pw.join() pr.terminate()
运行结果:
1 2 3 4 5 6 7 8 Process to write: 2024 Put A to queue... Process to read: 4308 Get A from queue. Put B to queue... Get B from queue. Put C to queue... Get C from queue.
5. 端口扫描 — 进程 函数封装 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 def processScan (address, startPort=0 , endPort=65535 ): writeDate(address) st = time.time() processList = [] cpuCount = cpu_count() ports = [] if endPort // cpuCount >= 1 : for i in range (cpuCount): if i != cpuCount - 1 : num = endPort // cpuCount newPort = (startPort, startPort + num) ports.append(newPort) startPort += num else : ports.append((endPort // cpuCount * (cpuCount - 1 ), endPort)) else : ports.append((startPort, endPort)) for i in ports: p = Process(target=geventScan, args=(address, i[0 ], i[1 ])) p.start() processList.append(p) for i in processList: i.join() et = time.time() print (f'扫描过程,总计耗时:{(et - st):.2 f} s。' ) writeDate(f"{(et - st):.2 f} " )