端口扫描工具

1.0 版本

需求: 1.完成一个端口的扫描

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import socket

def scanPort():
# 1.使用 TCP 协议扫描端口
sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# 2.设置连接时间
sk.settimeout(0.5)

scan_ip = input("请输入需要扫描的IP:")
scan_port = input("请输入需要扫描的port:")

try:
# 3.创建连接
conn = sk.connect_ex((scan_ip, int(scan_port)))
if conn == 0:
print(f'主机:{scan_ip},端口:{scan_port} 已开放。')

# 4.关闭 socket
sk.close()

except:
pass


if __name__ == '__main__':
scanPort()

运行结果:

1
2
3
请输入需要扫描的IP:127.0.0.1
请输入需要扫描的port:9999
主机:127.0.0.1,端口:9999 已开放。

1.1 版本

需求:
1.需要能够扫描全部
方案:
1.循环扫描

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import socket

def portScan(address, startPort, endPort):
try:
for i in range(startPort, endPort):
# 实现 TCP 连接
sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 限制连接时间 0.5s
sk.settimeout(0.5)
conn = sk.connect_ex((address, i))
if conn == 0:
print(f'主机:{address},端口:{i} 已开放。')
sk.close()
except Exception:
pass

if __name__ == '__main__':
portScan("127.0.0.1", 9990, 10000)

运行结果:

1
主机:127.0.0.1,端口:9999 已开放。

1.2 版本

需求:
1.需要同时支持 ip 及 域名
2.需要验证 ip 有效性

方案:
1.解析域名
2.判断 ip 有效性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import re
import socket


def checkIp(ip):
"""
判断 IP 地址 是否正确
:param ip: 传入的 IP 地址
:return: 返回 真或假
"""
addressIp = re.compile('^((25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(25[0-5]|2[0-4]\d|[01]?\d\d?)$')
if addressIp.match(ip) and len(ip) != 0:
return ip
else:
return print("ip 错误")


def checkDomainName(domain_name):
"""
对 域名进行 验证并解析
:param domain_name: 需要验证的域名
:return: 解析后的 IP 地址
"""
# 判断是否为 http 开头,如果是,则进行截取
if domain_name.startswith("http"):
domainName = domain_name[domain_name.find("://") + 3:]
else:
domainName = domain_name

# 验证是否为合格的域名
pattern = re.compile(
r'^(([a-zA-Z]{1})|([a-zA-Z]{1}[a-zA-Z]{1})|'
r'([a-zA-Z]{1}[0-9]{1})|([0-9]{1}[a-zA-Z]{1})|'
r'([a-zA-Z0-9][-_.a-zA-Z0-9]{0,61}[a-zA-Z0-9]))\.'
r'([a-zA-Z]{2,13}|[a-zA-Z0-9-]{2,30}.[a-zA-Z]{2,3})$'
)
if pattern.match(domainName):

# 对域名进行解析并返回
serverIp = socket.gethostbyname(domainName)
return serverIp
else:
print("域名错误")


# 扫描端口
def portScan(address, startPort, endPort):
"""
判断是域名还是IP,然后进行扫描
:param address: 域名或IP(str)
:param startPort: 端口起始位置(int)
:param endPort: 端口结束位置(int)
:return: 返回已经开启的端口
"""
my_re = re.compile(r'[A-Za-z]', re.S)
if len(re.findall(my_re, address)):
add = checkDomainName(address)
else:
add = checkIp(address)

try:
for i in range(startPort, endPort):
# 实现 TCP 连接
sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 限制连接时间 0.5s
sk.settimeout(0.5)
conn = sk.connect_ex((add, i))
if conn == 0:
print(f'主机:{address},端口:{i} 已开放。')
sk.close()
except:
pass


if __name__ == '__main__':
portScan('http://www.baidu.com', 79, 81)

1.3 版本

需求:
1.扫描效率过慢

方案:
1.使用多线程技术,扫描速度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import re
import socket
import time
import threading


def checkIp(ip):
"""
判断 IP 地址 是否正确
:param ip: 传入的 IP 地址
:return: 返回 True 或 False
"""
addressIp = re.compile('^((25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(25[0-5]|2[0-4]\d|[01]?\d\d?)$')
if addressIp.match(ip) and len(ip) != 0:
return True
else:
return False


def checkDomainName(domain_name):
"""
对 域名进行 验证
:param domain_name: 需要验证的域名
:return: 返回 True 或 False
"""
if domain_name.startswith("http"):
domainName = domain_name[domain_name.find("://") + 3:]
else:
domainName = domain_name

# 验证是否为合格的域名
pattern = re.compile(
r'^(([a-zA-Z]{1})|([a-zA-Z]{1}[a-zA-Z]{1})|'
r'([a-zA-Z]{1}[0-9]{1})|([0-9]{1}[a-zA-Z]{1})|'
r'([a-zA-Z0-9][-_.a-zA-Z0-9]{0,61}[a-zA-Z0-9]))\.'
r'([a-zA-Z]{2,13}|[a-zA-Z0-9-]{2,30}.[a-zA-Z]{2,3})$'
)
if pattern.match(domainName) and len(domainName) != 0:
return True
else:
return False


def analysisDomainName(domain_name):
"""
对 域名 进行解析
:param domain_name: 需要解析的域名
:return: 返回 解析后的 IP 地址
"""
# 判断是否为 http 开头,如果是,则进行截取
if domain_name.startswith("http"):
domainName = domain_name[domain_name.find("://") + 3:]
else:
domainName = domain_name
serverIp = socket.gethostbyname(domainName)
return serverIp


def checkPort(startPort, endPort):
"""
验证端口是否真确
:param startPort: 开始值
:param endPort: 解除值
:return: 返回检测合格的端口
"""
if str(startPort).isdigit() and str(endPort).isdigit():
if 0 <= startPort <= 65535 and 0 <= endPort <= 65535:
if startPort < endPort:
return startPort, endPort + 1
else:
print("端口的起始值不能大于结束值。")
else:
print("端口范围不能超出“0~65535”。")
else:
print("请输入数字字符。")


def portScan(address, port, postList):
"""
扫描函数---线程扫描
:param postList: 空列表
:param port: 端口(int)
:param address: 域名或IP(str)
:return: 返回已经开启的端口
"""
try:
# 实现 TCP 连接
sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 限制连接时间 0.5s
sk.settimeout(0.5)
conn = sk.connect_ex((address, port))
if conn == 0:
print(f'主机:{address},端口:{port} 已开放。')
postList.append(port) # 将开放的端口添加到列表
sk.close()
except:
print("扫描异常。")


def threadsScan(address, startPort=0, endPort=65535, portList=None):
"""
线程函数封装
:param address: IP地址
:param startPort: 端口开始值
:param endPort: 端口结束值
:param portList: 空列表
:return:
"""
if portList is None:
portList = []
threadList = [] # 存放线程对象
startTime = time.time()
for one in range(startPort, endPort + 1):
t = threading.Thread(target=portScan, args=(address, one, portList))
threadList.append(t) # 增加到列表

# 启动线程
for one in threadList:
one.start()

# 阻塞线程
for one in threadList:
one.join()

endTime = time.time()
print(f'扫描过程,总计耗时:{(endTime - startTime):.2f} s')
print(f'开放端口为:{portList}')
print(f'主机:{address},总计开放端口:{len(portList)} 个。')

exportLogo(time.strftime("\n %Y-%m-%d %H:%M:%S \n"))
for one in portList:
exportLogo(f'主机:{address},端口:{one} 已开放。')
exportLogo(f'扫描过程,总计耗时:{(endTime - startTime):.2f} s。')
exportLogo(f'主机:{address},总计开放端口:{len(portList)} 个。')


def exportLogo(info):
"""
日志编写
:param info: 要写入日志的内容
:return:
"""
with open(file=r'D:\Study\TestDevelopment\PortScanTools\logo.txt', mode='a+', encoding='utf-8') as f:
f.truncate(0) # 清空文件,方便调试
f.write(info + '\n')


if __name__ == '__main__':
# ip = "127.0.0.1"
ip = "47.96.181.17"
start = 0
end = 65535
if checkPort(start, end):
if checkIp(ip):
threadsScan(ip, start, end)
elif checkDomainName(ip):
threadsScan(analysisDomainName(ip), start, end)

1.4 版本

需求:
1.多线程扫描时不稳定

方案:
1.使用协程基础解决

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import re
import socket
import time
import gevent
import gevent.pool

from gevent import monkey
monkey.patch_all()


def checkIp(ip):
"""
判断 IP 地址 是否正确
:param ip: 传入的 IP 地址
:return: 返回 True 或 False
"""
addressIp = re.compile('^((25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(25[0-5]|2[0-4]\d|[01]?\d\d?)$')
if addressIp.match(ip) and len(ip) != 0:
return True
else:
return False


def checkDomainName(domain_name):
"""
对 域名进行 验证
:param domain_name: 需要验证的域名
:return: 返回 True 或 False
"""
if domain_name.startswith("http"):
domainName = domain_name[domain_name.find("://") + 3:]
else:
domainName = domain_name

# 验证是否为合格的域名
pattern = re.compile(
r'^(([a-zA-Z]{1})|([a-zA-Z]{1}[a-zA-Z]{1})|'
r'([a-zA-Z]{1}[0-9]{1})|([0-9]{1}[a-zA-Z]{1})|'
r'([a-zA-Z0-9][-_.a-zA-Z0-9]{0,61}[a-zA-Z0-9]))\.'
r'([a-zA-Z]{2,13}|[a-zA-Z0-9-]{2,30}.[a-zA-Z]{2,3})$'
)
if pattern.match(domainName) and len(domainName) != 0:
return True
else:
return False


def analysisDomainName(domain_name):
"""
对 域名 进行解析
:param domain_name: 需要解析的域名
:return: 返回 解析后的 IP 地址
"""
# 判断是否为 http 开头,如果是,则进行截取
if domain_name.startswith("http"):
domainName = domain_name[domain_name.find("://") + 3:]
else:
domainName = domain_name
serverIp = socket.gethostbyname(domainName)
return serverIp


def checkPort(startPort, endPort):
"""
验证端口是否真确
:param startPort: 开始值
:param endPort: 解除值
:return: 返回检测合格的端口
"""
if str(startPort).isdigit() and str(endPort).isdigit():
if 0 <= startPort <= 65535 and 0 <= endPort <= 65535:
if startPort < endPort:
return startPort, endPort + 1
else:
print("端口的起始值不能大于结束值。")
else:
print("端口范围不能超出“0~65535”。")
else:
print("请输入数字字符。")


def portScan(address, port, postList):
"""
扫描函数---线程扫描
:param postList: 空列表
:param port: 端口(int)
:param address: 域名或IP(str)
:return: 返回已经开启的端口
"""
try:
# 实现 TCP 连接
sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 限制连接时间 0.5s
sk.settimeout(0.5)
conn = sk.connect_ex((address, port))
if conn == 0:
print(f'主机:{address},端口:{port} 已开放。')
postList.append(port) # 将开放的端口添加到列表
sk.close()
except:
print("扫描异常。")


def geventScan(address, startPort=0, endPort=65535, portList=None):
"""
协程函数封装
:param address:ip地址
:param startPort: 端口开始值
:param endPort: 端口结束值
:param portList: 空列表
:return:
"""
if portList is None:
portList = []
geventList = [] # 存放线程对象
g = gevent.pool.Pool(200) # 协程池

startTime = time.time()
for one in range(startPort, endPort + 1):
# 创建协程对象
geventList.append(g.spawn(portScan, address, one, portList))

# 阻塞
gevent.joinall(geventList)

endTime = time.time()
print(f'扫描过程,总计耗时:{(endTime - startTime):.2f} s')
print(f'开放端口为:{portList}')
print(f'主机:{address},总计开放端口:{len(portList)} 个。')

exportLogo(time.strftime("\n %Y-%m-%d %H:%M:%S \n"))
for one in portList:
exportLogo(f'主机:{address},端口:{one} 已开放。')
exportLogo(f'扫描过程,总计耗时:{(endTime - startTime):.2f} s。')
exportLogo(f'主机:{address},总计开放端口:{len(portList)} 个。')


def exportLogo(info):
with open(file=r'D:\Study\TestDevelopment\PortScanTools\logo.txt', mode='a+', encoding='utf-8') as f:
f.truncate(0) # 清空文件,方便调试
f.write(info + '\n')


if __name__ == '__main__':
# ip = "127.0.0.1"
ip = "47.96.181.17"
start = 0
end = 65535
if checkPort(start, end):
if checkIp(ip):
geventScan(ip, start, end)
elif checkDomainName(ip):
geventScan(analysisDomainName(ip), start, end)

1.5 版本

需求:
1.协程扫描速度过慢

方案:
1.使用多进程和协程配合解决

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
import re
import os
import socket
import time
import gevent
import gevent.pool
from multiprocessing import Pool, cpu_count, Process, Manager

from gevent import monkey
monkey.patch_all()


# ip检查
def checkIp(ip):
"""
判断 IP 地址 是否正确
:param ip: 传入的 IP 地址
:return: 返回 True 或 False
"""
addressIp = re.compile('^((25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(25[0-5]|2[0-4]\d|[01]?\d\d?)$')
if addressIp.match(ip) and len(ip) != 0:
return True
else:
return False


# 域名检查
def checkDomainName(domain_name):
"""
对 域名进行 验证
:param domain_name: 需要验证的域名
:return: 返回 True 或 False
"""
if domain_name.startswith("http"):
domainName = domain_name[domain_name.find("://") + 3:]
else:
domainName = domain_name

# 验证是否为合格的域名
pattern = re.compile(
r'^(([a-zA-Z]{1})|([a-zA-Z]{1}[a-zA-Z]{1})|'
r'([a-zA-Z]{1}[0-9]{1})|([0-9]{1}[a-zA-Z]{1})|'
r'([a-zA-Z0-9][-_.a-zA-Z0-9]{0,61}[a-zA-Z0-9]))\.'
r'([a-zA-Z]{2,13}|[a-zA-Z0-9-]{2,30}.[a-zA-Z]{2,3})$'
)
if pattern.match(domainName) and len(domainName) != 0:
return True
else:
return False


# 域名解析
def analysisDomainName(domain_name):
"""
对 域名 进行解析
:param domain_name: 需要解析的域名
:return: 返回 解析后的 IP 地址
"""
# 判断是否为 http 开头,如果是,则进行截取
if domain_name.startswith("http"):
domainName = domain_name[domain_name.find("://") + 3:]
else:
domainName = domain_name
serverIp = socket.gethostbyname(domainName)
return serverIp


# 端口检查
def checkPort(startPort, endPort):
"""
验证端口是否真确
:param startPort: 开始值
:param endPort: 解除值
:return: 返回检测合格的端口
"""
if str(startPort).isdigit() and str(endPort).isdigit():
if 0 <= startPort <= 65535 and 0 <= endPort <= 65535:
if startPort < endPort:
return startPort, endPort + 1
else:
print("端口的起始值不能大于结束值。")
else:
print("端口范围不能超出“0~65535”。")
else:
print("请输入数字字符。")


# 端口扫描
def portScan(address, port, postList):
"""
扫描函数---线程扫描
:param postList: 空列表
:param port: 端口(int)
:param address: 域名或IP(str)
:return: 返回已经开启的端口
"""
try:
# 实现 TCP 连接
sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 限制连接时间 0.5s
sk.settimeout(0.5)
conn = sk.connect_ex((address, port))
if conn == 0:
print(f'主机:{address},端口:{port} 已开放。')
writeDate(f"{port}")
postList.append(port) # 将开放的端口添加到列表
sk.close()
except:
print("扫描异常。")


# 协程封装
def geventScan(address, startPort, endPort):
"""
协程函数封装
:param address:ip地址
:param startPort: 端口开始值
:param endPort: 端口结束值
:return:
"""
returnList = []
geventList = [] # 存放线程对象
g = gevent.pool.Pool(500) # 协程池

for one in range(startPort, endPort + 1):
# 创建协程对象
geventList.append(g.spawn(portScan, address, one, returnList))

# 阻塞
gevent.joinall(geventList)


# 进程封装
def processScan(address, startPort=0, endPort=65535):
writeDate(address)
st = time.time()
processList = []
cpuCount = cpu_count() # 获取CPU核数
ports = []
if endPort // cpuCount >= 1:
for i in range(cpuCount):
if i != cpuCount - 1:
num = endPort // cpuCount
newPort = (startPort, startPort + num)
ports.append(newPort)
startPort += num
else:
ports.append((endPort // cpuCount * (cpuCount - 1), endPort))
else:
ports.append((startPort, endPort))

for i in ports:
p = Process(target=geventScan, args=(address, i[0], i[1]))
p.start()
processList.append(p)

for i in processList:
i.join()
et = time.time()
print(f'扫描过程,总计耗时:{(et - st):.2f} s。')
writeDate(f"{(et - st):.2f}")


# 写入临时文件
def writeDate(info):
with open(file=r'D:\Study\TestDevelopment\PortScanTools\temporaryFile.txt', mode='a+', encoding='utf-8') as f:
f.write(info + '\n')


# 文件写入
def writeLog():
portList = []
with open(file='temporaryFile.txt', mode='r', encoding='utf-8') as f1:
dataList = f1.readlines()

if os.path.exists('log.txt'):
os.remove('log.txt')

with open(file='log.txt', mode='a+', encoding='utf-8') as f2:
f2.truncate(0)
f2.write(time.strftime("%Y-%m-%d %H:%M:%S\n\n"))
for i in dataList[1:-1]:
portList.append(int(i[:-1]))
portList.sort()
for i in portList:
f2.write(f'主机:{dataList[0][:-1]},端口:{i} 已开放。\n')
f2.write(f'\n主机:{dataList[0][:-1]},开放端口共计:{len(portList)} 个。\n')
f2.write(f'扫描过程,总计耗时:{dataList[-1][:-1]} s。')

os.remove('temporaryFile.txt')


if __name__ == '__main__':
ip = "127.0.0.1"
# ip = "47.96.181.17"
start = 0
end = 65535
if checkPort(start, end):
if checkIp(ip):
processScan(ip, start, end)
elif checkDomainName(ip):
processScan(analysisDomainName(ip), start, end)

writeLog()