1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204
| import re import os import socket import time import gevent import gevent.pool from multiprocessing import Pool, cpu_count, Process, Manager
from gevent import monkey monkey.patch_all()
def checkIp(ip): """ 判断 IP 地址 是否正确 :param ip: 传入的 IP 地址 :return: 返回 True 或 False """ addressIp = re.compile('^((25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(25[0-5]|2[0-4]\d|[01]?\d\d?)$') if addressIp.match(ip) and len(ip) != 0: return True else: return False
def checkDomainName(domain_name): """ 对 域名进行 验证 :param domain_name: 需要验证的域名 :return: 返回 True 或 False """ if domain_name.startswith("http"): domainName = domain_name[domain_name.find("://") + 3:] else: domainName = domain_name
pattern = re.compile( r'^(([a-zA-Z]{1})|([a-zA-Z]{1}[a-zA-Z]{1})|' r'([a-zA-Z]{1}[0-9]{1})|([0-9]{1}[a-zA-Z]{1})|' r'([a-zA-Z0-9][-_.a-zA-Z0-9]{0,61}[a-zA-Z0-9]))\.' r'([a-zA-Z]{2,13}|[a-zA-Z0-9-]{2,30}.[a-zA-Z]{2,3})$' ) if pattern.match(domainName) and len(domainName) != 0: return True else: return False
def analysisDomainName(domain_name): """ 对 域名 进行解析 :param domain_name: 需要解析的域名 :return: 返回 解析后的 IP 地址 """ if domain_name.startswith("http"): domainName = domain_name[domain_name.find("://") + 3:] else: domainName = domain_name serverIp = socket.gethostbyname(domainName) return serverIp
def checkPort(startPort, endPort): """ 验证端口是否真确 :param startPort: 开始值 :param endPort: 解除值 :return: 返回检测合格的端口 """ if str(startPort).isdigit() and str(endPort).isdigit(): if 0 <= startPort <= 65535 and 0 <= endPort <= 65535: if startPort < endPort: return startPort, endPort + 1 else: print("端口的起始值不能大于结束值。") else: print("端口范围不能超出“0~65535”。") else: print("请输入数字字符。")
def portScan(address, port, postList): """ 扫描函数---线程扫描 :param postList: 空列表 :param port: 端口(int) :param address: 域名或IP(str) :return: 返回已经开启的端口 """ try: sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sk.settimeout(0.5) conn = sk.connect_ex((address, port)) if conn == 0: print(f'主机:{address},端口:{port} 已开放。') writeDate(f"{port}") postList.append(port) sk.close() except: print("扫描异常。")
def geventScan(address, startPort, endPort): """ 协程函数封装 :param address:ip地址 :param startPort: 端口开始值 :param endPort: 端口结束值 :return: """ returnList = [] geventList = [] g = gevent.pool.Pool(500)
for one in range(startPort, endPort + 1): geventList.append(g.spawn(portScan, address, one, returnList))
gevent.joinall(geventList)
def processScan(address, startPort=0, endPort=65535): writeDate(address) st = time.time() processList = [] cpuCount = cpu_count() ports = [] if endPort // cpuCount >= 1: for i in range(cpuCount): if i != cpuCount - 1: num = endPort // cpuCount newPort = (startPort, startPort + num) ports.append(newPort) startPort += num else: ports.append((endPort // cpuCount * (cpuCount - 1), endPort)) else: ports.append((startPort, endPort))
for i in ports: p = Process(target=geventScan, args=(address, i[0], i[1])) p.start() processList.append(p)
for i in processList: i.join() et = time.time() print(f'扫描过程,总计耗时:{(et - st):.2f} s。') writeDate(f"{(et - st):.2f}")
def writeDate(info): with open(file=r'D:\Study\TestDevelopment\PortScanTools\temporaryFile.txt', mode='a+', encoding='utf-8') as f: f.write(info + '\n')
def writeLog(): portList = [] with open(file='temporaryFile.txt', mode='r', encoding='utf-8') as f1: dataList = f1.readlines()
if os.path.exists('log.txt'): os.remove('log.txt')
with open(file='log.txt', mode='a+', encoding='utf-8') as f2: f2.truncate(0) f2.write(time.strftime("%Y-%m-%d %H:%M:%S\n\n")) for i in dataList[1:-1]: portList.append(int(i[:-1])) portList.sort() for i in portList: f2.write(f'主机:{dataList[0][:-1]},端口:{i} 已开放。\n') f2.write(f'\n主机:{dataList[0][:-1]},开放端口共计:{len(portList)} 个。\n') f2.write(f'扫描过程,总计耗时:{dataList[-1][:-1]} s。')
os.remove('temporaryFile.txt')
if __name__ == '__main__': ip = "127.0.0.1" start = 0 end = 65535 if checkPort(start, end): if checkIp(ip): processScan(ip, start, end) elif checkDomainName(ip): processScan(analysisDomainName(ip), start, end)
writeLog()
|