
| import re import os import socket import time import gevent import gevent.pool from multiprocessing import Pool, cpu_count, Process, Manager
from gevent import monkey monkey.patch_all()
def checkIp(ip): """ 判断 IP 地址 是否正确 :param ip: 传入的 IP 地址 :return: 返回 True 或 False """ addressIp = re.compile('^((25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(25[0-5]|2[0-4]\d|[01]?\d\d?)$') if addressIp.match(ip) and len(ip) != 0: return True else: return False
def checkDomainName(domain_name): """ 对 域名进行 验证 :param domain_name: 需要验证的域名 :return: 返回 True 或 False """ if domain_name.startswith("http"): domainName = domain_name[domain_name.find("://") + 3:] else: domainName = domain_name
pattern = re.compile( r'^(([a-zA-Z]{1})|([a-zA-Z]{1}[a-zA-Z]{1})|' r'([a-zA-Z]{1}[0-9]{1})|([0-9]{1}[a-zA-Z]{1})|' r'([a-zA-Z0-9][-_.a-zA-Z0-9]{0,61}[a-zA-Z0-9]))\.' r'([a-zA-Z]{2,13}|[a-zA-Z0-9-]{2,30}.[a-zA-Z]{2,3})$' ) if pattern.match(domainName) and len(domainName) != 0: return True else: return False
def analysisDomainName(domain_name): """ 对 域名 进行解析 :param domain_name: 需要解析的域名 :return: 返回 解析后的 IP 地址 """ if domain_name.startswith("http"): domainName = domain_name[domain_name.find("://") + 3:] else: domainName = domain_name serverIp = socket.gethostbyname(domainName) return serverIp
def checkPort(startPort, endPort): """ 验证端口是否真确 :param startPort: 开始值 :param endPort: 解除值 :return: 返回检测合格的端口 """ if str(startPort).isdigit() and str(endPort).isdigit(): if 0 <= startPort <= 65535 and 0 <= endPort <= 65535: if startPort < endPort: return startPort, endPort + 1 else: print("端口的起始值不能大于结束值。") else: print("端口范围不能超出“0~65535”。") else: print("请输入数字字符。")
def portScan(address, port, postList): """ 扫描函数---线程扫描 :param postList: 空列表 :param port: 端口(int) :param address: 域名或IP(str) :return: 返回已经开启的端口 """ try: sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sk.settimeout(0.5) conn = sk.connect_ex((address, port)) if conn == 0: print(f'主机:{address},端口:{port} 已开放。') writeDate(f"{port}") postList.append(port) sk.close() except: print("扫描异常。")
def geventScan(address, startPort, endPort): """ 协程函数封装 :param address:ip地址 :param startPort: 端口开始值 :param endPort: 端口结束值 :return: """ returnList = [] geventList = [] g = gevent.pool.Pool(500)
for one in range(startPort, endPort + 1): geventList.append(g.spawn(portScan, address, one, returnList))
gevent.joinall(geventList)
def processScan(address, startPort=0, endPort=65535): writeDate(address) st = time.time() processList = [] cpuCount = cpu_count() ports = [] if endPort // cpuCount >= 1: for i in range(cpuCount): if i != cpuCount - 1: num = endPort // cpuCount newPort = (startPort, startPort + num) ports.append(newPort) startPort += num else: ports.append((endPort // cpuCount * (cpuCount - 1), endPort)) else: ports.append((startPort, endPort))
for i in ports: p = Process(target=geventScan, args=(address, i[0], i[1])) p.start() processList.append(p)
for i in processList: i.join() et = time.time() print(f'扫描过程,总计耗时:{(et - st):.2f} s。') writeDate(f"{(et - st):.2f}")
def writeDate(info): with open(file=r'D:\Study\TestDevelopment\PortScanTools\temporaryFile.txt', mode='a+', encoding='utf-8') as f: f.write(info + '\n')
def writeLog(): portList = [] with open(file='temporaryFile.txt', mode='r', encoding='utf-8') as f1: dataList = f1.readlines()
if os.path.exists('log.txt'): os.remove('log.txt')
with open(file='log.txt', mode='a+', encoding='utf-8') as f2: f2.truncate(0) f2.write(time.strftime("%Y-%m-%d %H:%M:%S\n\n")) for i in dataList[1:-1]: portList.append(int(i[:-1])) portList.sort() for i in portList: f2.write(f'主机:{dataList[0][:-1]},端口:{i} 已开放。\n') f2.write(f'\n主机:{dataList[0][:-1]},开放端口共计:{len(portList)} 个。\n') f2.write(f'扫描过程,总计耗时:{dataList[-1][:-1]} s。')
os.remove('temporaryFile.txt')
if __name__ == '__main__': ip = "127.0.0.1" start = 0 end = 65535 if checkPort(start, end): if checkIp(ip): processScan(ip, start, end) elif checkDomainName(ip): processScan(analysisDomainName(ip), start, end)
writeLog()
|