9.端口扫描工具
大约 10 分钟学习笔记测试开发测试工具开发
1.0 版本
提示
需求: 1.完成一个端口的扫描
import socket
def scanPort():
# 1.使用 TCP 协议扫描端口
sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 2.设置连接时间
sk.settimeout(0.5)
scan_ip = input("请输入需要扫描的IP:")
scan_port = input("请输入需要扫描的port:")
try:
# 3.创建连接
conn = sk.connect_ex((scan_ip, int(scan_port)))
if conn == 0:
print(f'主机:{scan_ip},端口:{scan_port} 已开放。')
# 4.关闭 socket
sk.close()
except:
pass
if __name__ == '__main__':
scanPort()
运行结果:
请输入需要扫描的IP:127.0.0.1
请输入需要扫描的port:9999
主机:127.0.0.1,端口:9999 已开放。
1.1 版本
提示
需求:
1.需要能够扫描全部
方案:
1.循环扫描
import socket
def portScan(address, startPort, endPort):
try:
for i in range(startPort, endPort):
# 实现 TCP 连接
sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 限制连接时间 0.5s
sk.settimeout(0.5)
conn = sk.connect_ex((address, i))
if conn == 0:
print(f'主机:{address},端口:{i} 已开放。')
sk.close()
except Exception:
pass
if __name__ == '__main__':
portScan("127.0.0.1", 9990, 10000)
运行结果:
主机:127.0.0.1,端口:9999 已开放。
1.2 版本
提示
需求:
1.需要同时支持 ip 及 域名
2.需要验证 ip 有效性
方案:
1.解析域名
2.判断 ip 有效性
import re
import socket
def checkIp(ip):
"""
判断 IP 地址 是否正确
:param ip: 传入的 IP 地址
:return: 返回 真或假
"""
addressIp = re.compile('^((25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(25[0-5]|2[0-4]\d|[01]?\d\d?)$')
if addressIp.match(ip) and len(ip) != 0:
return ip
else:
return print("ip 错误")
def checkDomainName(domain_name):
"""
对 域名进行 验证并解析
:param domain_name: 需要验证的域名
:return: 解析后的 IP 地址
"""
# 判断是否为 http 开头,如果是,则进行截取
if domain_name.startswith("http"):
domainName = domain_name[domain_name.find("://") + 3:]
else:
domainName = domain_name
# 验证是否为合格的域名
pattern = re.compile(
r'^(([a-zA-Z]{1})|([a-zA-Z]{1}[a-zA-Z]{1})|'
r'([a-zA-Z]{1}[0-9]{1})|([0-9]{1}[a-zA-Z]{1})|'
r'([a-zA-Z0-9][-_.a-zA-Z0-9]{0,61}[a-zA-Z0-9]))\.'
r'([a-zA-Z]{2,13}|[a-zA-Z0-9-]{2,30}.[a-zA-Z]{2,3})$'
)
if pattern.match(domainName):
# 对域名进行解析并返回
serverIp = socket.gethostbyname(domainName)
return serverIp
else:
print("域名错误")
# 扫描端口
def portScan(address, startPort, endPort):
"""
判断是域名还是IP,然后进行扫描
:param address: 域名或IP(str)
:param startPort: 端口起始位置(int)
:param endPort: 端口结束位置(int)
:return: 返回已经开启的端口
"""
my_re = re.compile(r'[A-Za-z]', re.S)
if len(re.findall(my_re, address)):
add = checkDomainName(address)
else:
add = checkIp(address)
try:
for i in range(startPort, endPort):
# 实现 TCP 连接
sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 限制连接时间 0.5s
sk.settimeout(0.5)
conn = sk.connect_ex((add, i))
if conn == 0:
print(f'主机:{address},端口:{i} 已开放。')
sk.close()
except:
pass
if __name__ == '__main__':
portScan('http://www.baidu.com', 79, 81)
1.3 版本
提示
需求:
1.扫描效率过慢
方案:
1.使用多线程技术,扫描速度
import re
import socket
import time
import threading
def checkIp(ip):
"""
判断 IP 地址 是否正确
:param ip: 传入的 IP 地址
:return: 返回 True 或 False
"""
addressIp = re.compile('^((25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(25[0-5]|2[0-4]\d|[01]?\d\d?)$')
if addressIp.match(ip) and len(ip) != 0:
return True
else:
return False
def checkDomainName(domain_name):
"""
对 域名进行 验证
:param domain_name: 需要验证的域名
:return: 返回 True 或 False
"""
if domain_name.startswith("http"):
domainName = domain_name[domain_name.find("://") + 3:]
else:
domainName = domain_name
# 验证是否为合格的域名
pattern = re.compile(
r'^(([a-zA-Z]{1})|([a-zA-Z]{1}[a-zA-Z]{1})|'
r'([a-zA-Z]{1}[0-9]{1})|([0-9]{1}[a-zA-Z]{1})|'
r'([a-zA-Z0-9][-_.a-zA-Z0-9]{0,61}[a-zA-Z0-9]))\.'
r'([a-zA-Z]{2,13}|[a-zA-Z0-9-]{2,30}.[a-zA-Z]{2,3})$'
)
if pattern.match(domainName) and len(domainName) != 0:
return True
else:
return False
def analysisDomainName(domain_name):
"""
对 域名 进行解析
:param domain_name: 需要解析的域名
:return: 返回 解析后的 IP 地址
"""
# 判断是否为 http 开头,如果是,则进行截取
if domain_name.startswith("http"):
domainName = domain_name[domain_name.find("://") + 3:]
else:
domainName = domain_name
serverIp = socket.gethostbyname(domainName)
return serverIp
def checkPort(startPort, endPort):
"""
验证端口是否真确
:param startPort: 开始值
:param endPort: 解除值
:return: 返回检测合格的端口
"""
if str(startPort).isdigit() and str(endPort).isdigit():
if 0 <= startPort <= 65535 and 0 <= endPort <= 65535:
if startPort < endPort:
return startPort, endPort + 1
else:
print("端口的起始值不能大于结束值。")
else:
print("端口范围不能超出“0~65535”。")
else:
print("请输入数字字符。")
def portScan(address, port, postList):
"""
扫描函数---线程扫描
:param postList: 空列表
:param port: 端口(int)
:param address: 域名或IP(str)
:return: 返回已经开启的端口
"""
try:
# 实现 TCP 连接
sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 限制连接时间 0.5s
sk.settimeout(0.5)
conn = sk.connect_ex((address, port))
if conn == 0:
print(f'主机:{address},端口:{port} 已开放。')
postList.append(port) # 将开放的端口添加到列表
sk.close()
except:
print("扫描异常。")
def threadsScan(address, startPort=0, endPort=65535, portList=None):
"""
线程函数封装
:param address: IP地址
:param startPort: 端口开始值
:param endPort: 端口结束值
:param portList: 空列表
:return:
"""
if portList is None:
portList = []
threadList = [] # 存放线程对象
startTime = time.time()
for one in range(startPort, endPort + 1):
t = threading.Thread(target=portScan, args=(address, one, portList))
threadList.append(t) # 增加到列表
# 启动线程
for one in threadList:
one.start()
# 阻塞线程
for one in threadList:
one.join()
endTime = time.time()
print(f'扫描过程,总计耗时:{(endTime - startTime):.2f} s')
print(f'开放端口为:{portList}')
print(f'主机:{address},总计开放端口:{len(portList)} 个。')
exportLogo(time.strftime("\n %Y-%m-%d %H:%M:%S \n"))
for one in portList:
exportLogo(f'主机:{address},端口:{one} 已开放。')
exportLogo(f'扫描过程,总计耗时:{(endTime - startTime):.2f} s。')
exportLogo(f'主机:{address},总计开放端口:{len(portList)} 个。')
def exportLogo(info):
"""
日志编写
:param info: 要写入日志的内容
:return:
"""
with open(file=r'D:\Study\TestDevelopment\PortScanTools\logo.txt', mode='a+', encoding='utf-8') as f:
f.truncate(0) # 清空文件,方便调试
f.write(info + '\n')
if __name__ == '__main__':
# ip = "127.0.0.1"
ip = "47.96.181.17"
start = 0
end = 65535
if checkPort(start, end):
if checkIp(ip):
threadsScan(ip, start, end)
elif checkDomainName(ip):
threadsScan(analysisDomainName(ip), start, end)
1.4 版本
提示
需求:
1.多线程扫描时不稳定
方案:
1.使用协程基础解决
import re
import socket
import time
import gevent
import gevent.pool
from gevent import monkey
monkey.patch_all()
def checkIp(ip):
"""
判断 IP 地址 是否正确
:param ip: 传入的 IP 地址
:return: 返回 True 或 False
"""
addressIp = re.compile('^((25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(25[0-5]|2[0-4]\d|[01]?\d\d?)$')
if addressIp.match(ip) and len(ip) != 0:
return True
else:
return False
def checkDomainName(domain_name):
"""
对 域名进行 验证
:param domain_name: 需要验证的域名
:return: 返回 True 或 False
"""
if domain_name.startswith("http"):
domainName = domain_name[domain_name.find("://") + 3:]
else:
domainName = domain_name
# 验证是否为合格的域名
pattern = re.compile(
r'^(([a-zA-Z]{1})|([a-zA-Z]{1}[a-zA-Z]{1})|'
r'([a-zA-Z]{1}[0-9]{1})|([0-9]{1}[a-zA-Z]{1})|'
r'([a-zA-Z0-9][-_.a-zA-Z0-9]{0,61}[a-zA-Z0-9]))\.'
r'([a-zA-Z]{2,13}|[a-zA-Z0-9-]{2,30}.[a-zA-Z]{2,3})$'
)
if pattern.match(domainName) and len(domainName) != 0:
return True
else:
return False
def analysisDomainName(domain_name):
"""
对 域名 进行解析
:param domain_name: 需要解析的域名
:return: 返回 解析后的 IP 地址
"""
# 判断是否为 http 开头,如果是,则进行截取
if domain_name.startswith("http"):
domainName = domain_name[domain_name.find("://") + 3:]
else:
domainName = domain_name
serverIp = socket.gethostbyname(domainName)
return serverIp
def checkPort(startPort, endPort):
"""
验证端口是否真确
:param startPort: 开始值
:param endPort: 解除值
:return: 返回检测合格的端口
"""
if str(startPort).isdigit() and str(endPort).isdigit():
if 0 <= startPort <= 65535 and 0 <= endPort <= 65535:
if startPort < endPort:
return startPort, endPort + 1
else:
print("端口的起始值不能大于结束值。")
else:
print("端口范围不能超出“0~65535”。")
else:
print("请输入数字字符。")
def portScan(address, port, postList):
"""
扫描函数---线程扫描
:param postList: 空列表
:param port: 端口(int)
:param address: 域名或IP(str)
:return: 返回已经开启的端口
"""
try:
# 实现 TCP 连接
sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 限制连接时间 0.5s
sk.settimeout(0.5)
conn = sk.connect_ex((address, port))
if conn == 0:
print(f'主机:{address},端口:{port} 已开放。')
postList.append(port) # 将开放的端口添加到列表
sk.close()
except:
print("扫描异常。")
def geventScan(address, startPort=0, endPort=65535, portList=None):
"""
协程函数封装
:param address:ip地址
:param startPort: 端口开始值
:param endPort: 端口结束值
:param portList: 空列表
:return:
"""
if portList is None:
portList = []
geventList = [] # 存放线程对象
g = gevent.pool.Pool(200) # 协程池
startTime = time.time()
for one in range(startPort, endPort + 1):
# 创建协程对象
geventList.append(g.spawn(portScan, address, one, portList))
# 阻塞
gevent.joinall(geventList)
endTime = time.time()
print(f'扫描过程,总计耗时:{(endTime - startTime):.2f} s')
print(f'开放端口为:{portList}')
print(f'主机:{address},总计开放端口:{len(portList)} 个。')
exportLogo(time.strftime("\n %Y-%m-%d %H:%M:%S \n"))
for one in portList:
exportLogo(f'主机:{address},端口:{one} 已开放。')
exportLogo(f'扫描过程,总计耗时:{(endTime - startTime):.2f} s。')
exportLogo(f'主机:{address},总计开放端口:{len(portList)} 个。')
def exportLogo(info):
with open(file=r'D:\Study\TestDevelopment\PortScanTools\logo.txt', mode='a+', encoding='utf-8') as f:
f.truncate(0) # 清空文件,方便调试
f.write(info + '\n')
if __name__ == '__main__':
# ip = "127.0.0.1"
ip = "47.96.181.17"
start = 0
end = 65535
if checkPort(start, end):
if checkIp(ip):
geventScan(ip, start, end)
elif checkDomainName(ip):
geventScan(analysisDomainName(ip), start, end)
1.5 版本
提示
需求:
1.协程扫描速度过慢
方案:
1.使用多进程和协程配合解决
import re
import os
import socket
import time
import gevent
import gevent.pool
from multiprocessing import Pool, cpu_count, Process, Manager
from gevent import monkey
monkey.patch_all()
# ip检查
def checkIp(ip):
"""
判断 IP 地址 是否正确
:param ip: 传入的 IP 地址
:return: 返回 True 或 False
"""
addressIp = re.compile('^((25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(25[0-5]|2[0-4]\d|[01]?\d\d?)$')
if addressIp.match(ip) and len(ip) != 0:
return True
else:
return False
# 域名检查
def checkDomainName(domain_name):
"""
对 域名进行 验证
:param domain_name: 需要验证的域名
:return: 返回 True 或 False
"""
if domain_name.startswith("http"):
domainName = domain_name[domain_name.find("://") + 3:]
else:
domainName = domain_name
# 验证是否为合格的域名
pattern = re.compile(
r'^(([a-zA-Z]{1})|([a-zA-Z]{1}[a-zA-Z]{1})|'
r'([a-zA-Z]{1}[0-9]{1})|([0-9]{1}[a-zA-Z]{1})|'
r'([a-zA-Z0-9][-_.a-zA-Z0-9]{0,61}[a-zA-Z0-9]))\.'
r'([a-zA-Z]{2,13}|[a-zA-Z0-9-]{2,30}.[a-zA-Z]{2,3})$'
)
if pattern.match(domainName) and len(domainName) != 0:
return True
else:
return False
# 域名解析
def analysisDomainName(domain_name):
"""
对 域名 进行解析
:param domain_name: 需要解析的域名
:return: 返回 解析后的 IP 地址
"""
# 判断是否为 http 开头,如果是,则进行截取
if domain_name.startswith("http"):
domainName = domain_name[domain_name.find("://") + 3:]
else:
domainName = domain_name
serverIp = socket.gethostbyname(domainName)
return serverIp
# 端口检查
def checkPort(startPort, endPort):
"""
验证端口是否真确
:param startPort: 开始值
:param endPort: 解除值
:return: 返回检测合格的端口
"""
if str(startPort).isdigit() and str(endPort).isdigit():
if 0 <= startPort <= 65535 and 0 <= endPort <= 65535:
if startPort < endPort:
return startPort, endPort + 1
else:
print("端口的起始值不能大于结束值。")
else:
print("端口范围不能超出“0~65535”。")
else:
print("请输入数字字符。")
# 端口扫描
def portScan(address, port, postList):
"""
扫描函数---线程扫描
:param postList: 空列表
:param port: 端口(int)
:param address: 域名或IP(str)
:return: 返回已经开启的端口
"""
try:
# 实现 TCP 连接
sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 限制连接时间 0.5s
sk.settimeout(0.5)
conn = sk.connect_ex((address, port))
if conn == 0:
print(f'主机:{address},端口:{port} 已开放。')
writeDate(f"{port}")
postList.append(port) # 将开放的端口添加到列表
sk.close()
except:
print("扫描异常。")
# 协程封装
def geventScan(address, startPort, endPort):
"""
协程函数封装
:param address:ip地址
:param startPort: 端口开始值
:param endPort: 端口结束值
:return:
"""
returnList = []
geventList = [] # 存放线程对象
g = gevent.pool.Pool(500) # 协程池
for one in range(startPort, endPort + 1):
# 创建协程对象
geventList.append(g.spawn(portScan, address, one, returnList))
# 阻塞
gevent.joinall(geventList)
# 进程封装
def processScan(address, startPort=0, endPort=65535):
writeDate(address)
st = time.time()
processList = []
cpuCount = cpu_count() # 获取CPU核数
ports = []
if endPort // cpuCount >= 1:
for i in range(cpuCount):
if i != cpuCount - 1:
num = endPort // cpuCount
newPort = (startPort, startPort + num)
ports.append(newPort)
startPort += num
else:
ports.append((endPort // cpuCount * (cpuCount - 1), endPort))
else:
ports.append((startPort, endPort))
for i in ports:
p = Process(target=geventScan, args=(address, i[0], i[1]))
p.start()
processList.append(p)
for i in processList:
i.join()
et = time.time()
print(f'扫描过程,总计耗时:{(et - st):.2f} s。')
writeDate(f"{(et - st):.2f}")
# 写入临时文件
def writeDate(info):
with open(file=r'D:\Study\TestDevelopment\PortScanTools\temporaryFile.txt', mode='a+', encoding='utf-8') as f:
f.write(info + '\n')
# 文件写入
def writeLog():
portList = []
with open(file='temporaryFile.txt', mode='r', encoding='utf-8') as f1:
dataList = f1.readlines()
if os.path.exists('log.txt'):
os.remove('log.txt')
with open(file='log.txt', mode='a+', encoding='utf-8') as f2:
f2.truncate(0)
f2.write(time.strftime("%Y-%m-%d %H:%M:%S\n\n"))
for i in dataList[1:-1]:
portList.append(int(i[:-1]))
portList.sort()
for i in portList:
f2.write(f'主机:{dataList[0][:-1]},端口:{i} 已开放。\n')
f2.write(f'\n主机:{dataList[0][:-1]},开放端口共计:{len(portList)} 个。\n')
f2.write(f'扫描过程,总计耗时:{dataList[-1][:-1]} s。')
os.remove('temporaryFile.txt')
if __name__ == '__main__':
ip = "127.0.0.1"
# ip = "47.96.181.17"
start = 0
end = 65535
if checkPort(start, end):
if checkIp(ip):
processScan(ip, start, end)
elif checkDomainName(ip):
processScan(analysisDomainName(ip), start, end)
writeLog()