9.端口扫描工具
大约 10 分钟学习笔记测试开发测试工具开发
1.0 版本
提示
需求: 1.完成一个端口的扫描
import socket
def scanPort():
    # 1.使用 TCP 协议扫描端口
    sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # 2.设置连接时间
    sk.settimeout(0.5)
    scan_ip = input("请输入需要扫描的IP:")
    scan_port = input("请输入需要扫描的port:")
    try:
        # 3.创建连接
        conn = sk.connect_ex((scan_ip, int(scan_port)))
        if conn == 0:
            print(f'主机:{scan_ip},端口:{scan_port} 已开放。')
        # 4.关闭 socket
        sk.close()
    except:
        pass
if __name__ == '__main__':
    scanPort()运行结果:
请输入需要扫描的IP:127.0.0.1
请输入需要扫描的port:9999
主机:127.0.0.1,端口:9999 已开放。1.1 版本
提示
需求:
 1.需要能够扫描全部
 方案:
 1.循环扫描
import socket
def portScan(address, startPort, endPort):
    try:
        for i in range(startPort, endPort):
            # 实现 TCP 连接
            sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            # 限制连接时间 0.5s
            sk.settimeout(0.5)
            conn = sk.connect_ex((address, i))
            if conn == 0:
                print(f'主机:{address},端口:{i} 已开放。')
            sk.close()
    except Exception:
        pass
if __name__ == '__main__':
    portScan("127.0.0.1", 9990, 10000)运行结果:
主机:127.0.0.1,端口:9999 已开放。1.2 版本
提示
需求:
 1.需要同时支持 ip 及 域名
 2.需要验证 ip 有效性
方案:
 1.解析域名
 2.判断 ip 有效性
import re
import socket
def checkIp(ip):
    """
    判断 IP 地址 是否正确
    :param ip: 传入的 IP 地址
    :return: 返回 真或假
    """
    addressIp = re.compile('^((25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(25[0-5]|2[0-4]\d|[01]?\d\d?)$')
    if addressIp.match(ip) and len(ip) != 0:
        return ip
    else:
        return print("ip 错误")
def checkDomainName(domain_name):
    """
    对 域名进行 验证并解析
    :param domain_name: 需要验证的域名
    :return: 解析后的 IP 地址
    """
    # 判断是否为 http 开头,如果是,则进行截取
    if domain_name.startswith("http"):
        domainName = domain_name[domain_name.find("://") + 3:]
    else:
        domainName = domain_name
    # 验证是否为合格的域名
    pattern = re.compile(
        r'^(([a-zA-Z]{1})|([a-zA-Z]{1}[a-zA-Z]{1})|'
        r'([a-zA-Z]{1}[0-9]{1})|([0-9]{1}[a-zA-Z]{1})|'
        r'([a-zA-Z0-9][-_.a-zA-Z0-9]{0,61}[a-zA-Z0-9]))\.'
        r'([a-zA-Z]{2,13}|[a-zA-Z0-9-]{2,30}.[a-zA-Z]{2,3})$'
    )
    if pattern.match(domainName):
        # 对域名进行解析并返回
        serverIp = socket.gethostbyname(domainName)
        return serverIp
    else:
        print("域名错误")
# 扫描端口
def portScan(address, startPort, endPort):
    """
    判断是域名还是IP,然后进行扫描
    :param address: 域名或IP(str)
    :param startPort: 端口起始位置(int)
    :param endPort: 端口结束位置(int)
    :return: 返回已经开启的端口
    """
    my_re = re.compile(r'[A-Za-z]', re.S)
    if len(re.findall(my_re, address)):
        add = checkDomainName(address)
    else:
        add = checkIp(address)
    try:
        for i in range(startPort, endPort):
            # 实现 TCP 连接
            sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            # 限制连接时间 0.5s
            sk.settimeout(0.5)
            conn = sk.connect_ex((add, i))
            if conn == 0:
                print(f'主机:{address},端口:{i} 已开放。')
            sk.close()
    except:
        pass
if __name__ == '__main__':
    portScan('http://www.baidu.com', 79, 81)1.3 版本
提示
需求:
 1.扫描效率过慢
方案:
 1.使用多线程技术,扫描速度
import re
import socket
import time
import threading
def checkIp(ip):
    """
    判断 IP 地址 是否正确
    :param ip: 传入的 IP 地址
    :return: 返回 True 或 False
    """
    addressIp = re.compile('^((25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(25[0-5]|2[0-4]\d|[01]?\d\d?)$')
    if addressIp.match(ip) and len(ip) != 0:
        return True
    else:
        return False
def checkDomainName(domain_name):
    """
    对 域名进行 验证
    :param domain_name: 需要验证的域名
    :return: 返回 True 或 False
    """
    if domain_name.startswith("http"):
        domainName = domain_name[domain_name.find("://") + 3:]
    else:
        domainName = domain_name
    # 验证是否为合格的域名
    pattern = re.compile(
        r'^(([a-zA-Z]{1})|([a-zA-Z]{1}[a-zA-Z]{1})|'
        r'([a-zA-Z]{1}[0-9]{1})|([0-9]{1}[a-zA-Z]{1})|'
        r'([a-zA-Z0-9][-_.a-zA-Z0-9]{0,61}[a-zA-Z0-9]))\.'
        r'([a-zA-Z]{2,13}|[a-zA-Z0-9-]{2,30}.[a-zA-Z]{2,3})$'
    )
    if pattern.match(domainName) and len(domainName) != 0:
        return True
    else:
        return False
def analysisDomainName(domain_name):
    """
    对 域名 进行解析
    :param domain_name: 需要解析的域名
    :return: 返回 解析后的 IP 地址
    """
    # 判断是否为 http 开头,如果是,则进行截取
    if domain_name.startswith("http"):
        domainName = domain_name[domain_name.find("://") + 3:]
    else:
        domainName = domain_name
    serverIp = socket.gethostbyname(domainName)
    return serverIp
def checkPort(startPort, endPort):
    """
    验证端口是否真确
    :param startPort: 开始值
    :param endPort: 解除值
    :return: 返回检测合格的端口
    """
    if str(startPort).isdigit() and str(endPort).isdigit():
        if 0 <= startPort <= 65535 and 0 <= endPort <= 65535:
            if startPort < endPort:
                return startPort, endPort + 1
            else:
                print("端口的起始值不能大于结束值。")
        else:
            print("端口范围不能超出“0~65535”。")
    else:
        print("请输入数字字符。")
def portScan(address, port, postList):
    """
    扫描函数---线程扫描
    :param postList: 空列表
    :param port: 端口(int)
    :param address: 域名或IP(str)
    :return: 返回已经开启的端口
    """
    try:
        # 实现 TCP 连接
        sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        # 限制连接时间 0.5s
        sk.settimeout(0.5)
        conn = sk.connect_ex((address, port))
        if conn == 0:
            print(f'主机:{address},端口:{port} 已开放。')
            postList.append(port)       # 将开放的端口添加到列表
        sk.close()
    except:
        print("扫描异常。")
def threadsScan(address, startPort=0, endPort=65535, portList=None):
    """
    线程函数封装
    :param address: IP地址
    :param startPort: 端口开始值
    :param endPort: 端口结束值
    :param portList: 空列表
    :return:
    """
    if portList is None:
        portList = []
    threadList = []  # 存放线程对象
    startTime = time.time()
    for one in range(startPort, endPort + 1):
        t = threading.Thread(target=portScan, args=(address, one, portList))
        threadList.append(t)  # 增加到列表
    # 启动线程
    for one in threadList:
        one.start()
    # 阻塞线程
    for one in threadList:
        one.join()
    endTime = time.time()
    print(f'扫描过程,总计耗时:{(endTime - startTime):.2f} s')
    print(f'开放端口为:{portList}')
    print(f'主机:{address},总计开放端口:{len(portList)} 个。')
    exportLogo(time.strftime("\n %Y-%m-%d %H:%M:%S \n"))
    for one in portList:
        exportLogo(f'主机:{address},端口:{one} 已开放。')
    exportLogo(f'扫描过程,总计耗时:{(endTime - startTime):.2f} s。')
    exportLogo(f'主机:{address},总计开放端口:{len(portList)} 个。')
def exportLogo(info):
    """
    日志编写
    :param info: 要写入日志的内容
    :return:
    """
    with open(file=r'D:\Study\TestDevelopment\PortScanTools\logo.txt', mode='a+', encoding='utf-8') as f:
        f.truncate(0)    # 清空文件,方便调试
        f.write(info + '\n')
if __name__ == '__main__':
    # ip = "127.0.0.1"
    ip = "47.96.181.17"
    start = 0
    end = 65535
    if checkPort(start, end):
        if checkIp(ip):
            threadsScan(ip, start, end)
        elif checkDomainName(ip):
            threadsScan(analysisDomainName(ip), start, end)1.4 版本
提示
需求:
 1.多线程扫描时不稳定
方案:
 1.使用协程基础解决
import re
import socket
import time
import gevent
import gevent.pool
from gevent import monkey
monkey.patch_all()
def checkIp(ip):
    """
    判断 IP 地址 是否正确
    :param ip: 传入的 IP 地址
    :return: 返回 True 或 False
    """
    addressIp = re.compile('^((25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(25[0-5]|2[0-4]\d|[01]?\d\d?)$')
    if addressIp.match(ip) and len(ip) != 0:
        return True
    else:
        return False
def checkDomainName(domain_name):
    """
    对 域名进行 验证
    :param domain_name: 需要验证的域名
    :return: 返回 True 或 False
    """
    if domain_name.startswith("http"):
        domainName = domain_name[domain_name.find("://") + 3:]
    else:
        domainName = domain_name
    # 验证是否为合格的域名
    pattern = re.compile(
        r'^(([a-zA-Z]{1})|([a-zA-Z]{1}[a-zA-Z]{1})|'
        r'([a-zA-Z]{1}[0-9]{1})|([0-9]{1}[a-zA-Z]{1})|'
        r'([a-zA-Z0-9][-_.a-zA-Z0-9]{0,61}[a-zA-Z0-9]))\.'
        r'([a-zA-Z]{2,13}|[a-zA-Z0-9-]{2,30}.[a-zA-Z]{2,3})$'
    )
    if pattern.match(domainName) and len(domainName) != 0:
        return True
    else:
        return False
def analysisDomainName(domain_name):
    """
    对 域名 进行解析
    :param domain_name: 需要解析的域名
    :return: 返回 解析后的 IP 地址
    """
    # 判断是否为 http 开头,如果是,则进行截取
    if domain_name.startswith("http"):
        domainName = domain_name[domain_name.find("://") + 3:]
    else:
        domainName = domain_name
    serverIp = socket.gethostbyname(domainName)
    return serverIp
def checkPort(startPort, endPort):
    """
    验证端口是否真确
    :param startPort: 开始值
    :param endPort: 解除值
    :return: 返回检测合格的端口
    """
    if str(startPort).isdigit() and str(endPort).isdigit():
        if 0 <= startPort <= 65535 and 0 <= endPort <= 65535:
            if startPort < endPort:
                return startPort, endPort + 1
            else:
                print("端口的起始值不能大于结束值。")
        else:
            print("端口范围不能超出“0~65535”。")
    else:
        print("请输入数字字符。")
def portScan(address, port, postList):
    """
    扫描函数---线程扫描
    :param postList: 空列表
    :param port: 端口(int)
    :param address: 域名或IP(str)
    :return: 返回已经开启的端口
    """
    try:
        # 实现 TCP 连接
        sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        # 限制连接时间 0.5s
        sk.settimeout(0.5)
        conn = sk.connect_ex((address, port))
        if conn == 0:
            print(f'主机:{address},端口:{port} 已开放。')
            postList.append(port)       # 将开放的端口添加到列表
        sk.close()
    except:
        print("扫描异常。")
def geventScan(address, startPort=0, endPort=65535, portList=None):
    """
    协程函数封装
    :param address:ip地址
    :param startPort: 端口开始值
    :param endPort: 端口结束值
    :param portList: 空列表
    :return:
    """
    if portList is None:
        portList = []
    geventList = []  # 存放线程对象
    g = gevent.pool.Pool(200)   # 协程池
    startTime = time.time()
    for one in range(startPort, endPort + 1):
        # 创建协程对象
        geventList.append(g.spawn(portScan, address, one, portList))
        # 阻塞
    gevent.joinall(geventList)
    endTime = time.time()
    print(f'扫描过程,总计耗时:{(endTime - startTime):.2f} s')
    print(f'开放端口为:{portList}')
    print(f'主机:{address},总计开放端口:{len(portList)} 个。')
    exportLogo(time.strftime("\n %Y-%m-%d %H:%M:%S \n"))
    for one in portList:
        exportLogo(f'主机:{address},端口:{one} 已开放。')
    exportLogo(f'扫描过程,总计耗时:{(endTime - startTime):.2f} s。')
    exportLogo(f'主机:{address},总计开放端口:{len(portList)} 个。')
def exportLogo(info):
    with open(file=r'D:\Study\TestDevelopment\PortScanTools\logo.txt', mode='a+', encoding='utf-8') as f:
        f.truncate(0)    # 清空文件,方便调试
        f.write(info + '\n')
if __name__ == '__main__':
    # ip = "127.0.0.1"
    ip = "47.96.181.17"
    start = 0
    end = 65535
    if checkPort(start, end):
        if checkIp(ip):
            geventScan(ip, start, end)
        elif checkDomainName(ip):
            geventScan(analysisDomainName(ip), start, end)1.5 版本
提示
需求:
 1.协程扫描速度过慢
方案:
 1.使用多进程和协程配合解决
import re
import os
import socket
import time
import gevent
import gevent.pool
from multiprocessing import Pool, cpu_count, Process, Manager
from gevent import monkey
monkey.patch_all()
# ip检查
def checkIp(ip):
    """
    判断 IP 地址 是否正确
    :param ip: 传入的 IP 地址
    :return: 返回 True 或 False
    """
    addressIp = re.compile('^((25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(25[0-5]|2[0-4]\d|[01]?\d\d?)$')
    if addressIp.match(ip) and len(ip) != 0:
        return True
    else:
        return False
# 域名检查
def checkDomainName(domain_name):
    """
    对 域名进行 验证
    :param domain_name: 需要验证的域名
    :return: 返回 True 或 False
    """
    if domain_name.startswith("http"):
        domainName = domain_name[domain_name.find("://") + 3:]
    else:
        domainName = domain_name
    # 验证是否为合格的域名
    pattern = re.compile(
        r'^(([a-zA-Z]{1})|([a-zA-Z]{1}[a-zA-Z]{1})|'
        r'([a-zA-Z]{1}[0-9]{1})|([0-9]{1}[a-zA-Z]{1})|'
        r'([a-zA-Z0-9][-_.a-zA-Z0-9]{0,61}[a-zA-Z0-9]))\.'
        r'([a-zA-Z]{2,13}|[a-zA-Z0-9-]{2,30}.[a-zA-Z]{2,3})$'
    )
    if pattern.match(domainName) and len(domainName) != 0:
        return True
    else:
        return False
# 域名解析
def analysisDomainName(domain_name):
    """
    对 域名 进行解析
    :param domain_name: 需要解析的域名
    :return: 返回 解析后的 IP 地址
    """
    # 判断是否为 http 开头,如果是,则进行截取
    if domain_name.startswith("http"):
        domainName = domain_name[domain_name.find("://") + 3:]
    else:
        domainName = domain_name
    serverIp = socket.gethostbyname(domainName)
    return serverIp
# 端口检查
def checkPort(startPort, endPort):
    """
    验证端口是否真确
    :param startPort: 开始值
    :param endPort: 解除值
    :return: 返回检测合格的端口
    """
    if str(startPort).isdigit() and str(endPort).isdigit():
        if 0 <= startPort <= 65535 and 0 <= endPort <= 65535:
            if startPort < endPort:
                return startPort, endPort + 1
            else:
                print("端口的起始值不能大于结束值。")
        else:
            print("端口范围不能超出“0~65535”。")
    else:
        print("请输入数字字符。")
# 端口扫描
def portScan(address, port, postList):
    """
    扫描函数---线程扫描
    :param postList: 空列表
    :param port: 端口(int)
    :param address: 域名或IP(str)
    :return: 返回已经开启的端口
    """
    try:
        # 实现 TCP 连接
        sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        # 限制连接时间 0.5s
        sk.settimeout(0.5)
        conn = sk.connect_ex((address, port))
        if conn == 0:
            print(f'主机:{address},端口:{port} 已开放。')
            writeDate(f"{port}")
            postList.append(port)       # 将开放的端口添加到列表
        sk.close()
    except:
        print("扫描异常。")
# 协程封装
def geventScan(address, startPort, endPort):
    """
    协程函数封装
    :param address:ip地址
    :param startPort: 端口开始值
    :param endPort: 端口结束值
    :return:
    """
    returnList = []
    geventList = []  # 存放线程对象
    g = gevent.pool.Pool(500)   # 协程池
    for one in range(startPort, endPort + 1):
        # 创建协程对象
        geventList.append(g.spawn(portScan, address, one, returnList))
        # 阻塞
    gevent.joinall(geventList)
# 进程封装
def processScan(address, startPort=0, endPort=65535):
    writeDate(address)
    st = time.time()
    processList = []
    cpuCount = cpu_count()  # 获取CPU核数
    ports = []
    if endPort // cpuCount >= 1:
        for i in range(cpuCount):
            if i != cpuCount - 1:
                num = endPort // cpuCount
                newPort = (startPort, startPort + num)
                ports.append(newPort)
                startPort += num
            else:
                ports.append((endPort // cpuCount * (cpuCount - 1), endPort))
    else:
        ports.append((startPort, endPort))
    for i in ports:
        p = Process(target=geventScan, args=(address, i[0], i[1]))
        p.start()
        processList.append(p)
    for i in processList:
        i.join()
    et = time.time()
    print(f'扫描过程,总计耗时:{(et - st):.2f} s。')
    writeDate(f"{(et - st):.2f}")
# 写入临时文件
def writeDate(info):
    with open(file=r'D:\Study\TestDevelopment\PortScanTools\temporaryFile.txt', mode='a+', encoding='utf-8') as f:
        f.write(info + '\n')
# 文件写入
def writeLog():
    portList = []
    with open(file='temporaryFile.txt', mode='r', encoding='utf-8') as f1:
        dataList = f1.readlines()
    if os.path.exists('log.txt'):
        os.remove('log.txt')
    with open(file='log.txt', mode='a+', encoding='utf-8') as f2:
        f2.truncate(0)
        f2.write(time.strftime("%Y-%m-%d %H:%M:%S\n\n"))
        for i in dataList[1:-1]:
            portList.append(int(i[:-1]))
        portList.sort()
        for i in portList:
            f2.write(f'主机:{dataList[0][:-1]},端口:{i} 已开放。\n')
        f2.write(f'\n主机:{dataList[0][:-1]},开放端口共计:{len(portList)} 个。\n')
        f2.write(f'扫描过程,总计耗时:{dataList[-1][:-1]} s。')
    os.remove('temporaryFile.txt')
if __name__ == '__main__':
    ip = "127.0.0.1"
    # ip = "47.96.181.17"
    start = 0
    end = 65535
    if checkPort(start, end):
        if checkIp(ip):
            processScan(ip, start, end)
        elif checkDomainName(ip):
            processScan(analysisDomainName(ip), start, end)
    writeLog()