吾爱破解 - 52pojie.cn

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 4227|回复: 23
收起左侧

[Python 原创] Python内网终端开放端口扫描程序

  [复制链接]
xiaomingtt 发表于 2023-3-29 10:18
上次做了个端口扫描程序【新提醒】局域网开放高危端口自动扫描报警程序——语音+飞秋报警——附端口封闭工具 - 『原创发布区』 - 吾爱破解 - LCG - LSG |安卓破解|病毒分析|[url]www.52pojie.cn[/url]
其中的端口扫描程序用的是shadow1ng/fscan: 一款内网综合扫描工具,方便一键自动化、全方位漏扫扫描。 (github.com)
由于不是自己的扫描程序,使用总是有不方便的地方。 于是决定用最近正在学习的Python写了一段端口扫描程序。
[Python] 纯文本查看 复制代码
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import socket
import concurrent.futures
import ipaddress
 
# 定义要扫描的网段
subnet = "192.168.112.0/20"
# 定义要扫描的端口范围
port_range = [135]
# 定义线程池大小
thread_pool_size = 200
 
def scan_port(ip, port):
    #print("正在扫描" + str(ip) + "的端口:" + str(port) +"\n")
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(1)
        result = sock.connect_ex((ip, port))
        if result == 0:
            return str(ip) + ":" + str(port)
    except:
        pass
    finally:
        if sock:
            sock.close()
 
def scan_subnet(subnet):
    ips = [str(ip) for ip in list(ipaddress.IPv4Network(subnet).hosts())]
    print(ips)
    with concurrent.futures.ThreadPoolExecutor(max_workers=thread_pool_size) as executor:
        futures = [executor.submit(scan_port, ip, port) for ip in ips for port in port_range]
        concurrent.futures.wait(futures)
        for future in concurrent.futures.as_completed(futures):
            port = future.result()
            if port is not None:
                print(port)
 
if __name__ == "__main__":
    scan_subnet(subnet)

我们内网网段有4096个地址,实际使用不超过10%,上面代码却要扫描整个网段,感觉比较浪费资源,又做了下面这个,先用Ping做存活检测,然后对存活终端做端口扫描。
[Python] 纯文本查看 复制代码
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import subprocess
import os
import sys
import re
import concurrent.futures
import ipaddress
import socket
 
# 定义要扫描的网段
subnet = "192.168.112.0/20"
# 定义线程池大小
thread_pool_size = 200
# 定义要扫描的端口范围
port_range = [135]
 
def scan_port(ip, port):
    #print("正在扫描" + str(ip) + "的端口:" + str(port) +"\n")
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(1)
        result = sock.connect_ex((ip, port))
        if result == 0:
            return str(ip) + ":" + str(port)
    except:
        pass
    finally:
        if sock:
            sock.close()
 
 
def scan_subnet(subnet):
    with concurrent.futures.ThreadPoolExecutor(max_workers=thread_pool_size) as executor:
        futures = [executor.submit(scan_port, ip, port) for ip in subnet for port in port_range]
        concurrent.futures.wait(futures)
 
        for future in concurrent.futures.as_completed(futures):
            port = future.result()
            if port is not None:
                print(port)
 
def PingIP(ip):
    try:
        p = subprocess.Popen(['ping','-n','1','-w','20',ip],
                    stdout=subprocess.PIPE,
                    stdin = subprocess.PIPE,
                    stderr = subprocess.PIPE,
                    shell = True)
        output = p.stdout.read().decode("gbk").upper()
        if "TTL" in output:
            return(ip)
        else:
            pass
    except:
        pass
 
def checkLive(subnet):
    ips = [str(ip) for ip in list(ipaddress.IPv4Network(subnet).hosts())]
    iplist=[]
    with concurrent.futures.ThreadPoolExecutor(max_workers=thread_pool_size) as executor:
        futures = [executor.submit(PingIP, ip) for ip in ips]
        concurrent.futures.wait(futures)
 
        for future in concurrent.futures.as_completed(futures):
            ip = future.result()
            if ip is not None:
                iplist.append(ip)
        print(iplist)
        scan_subnet(iplist)
 
 
if __name__ == "__main__":
    checkLive(subnets)

但这程序运行后CPU直接拉满,检测速度也比不做活检慢了好几倍。于是想到用ARP做活检。
[Python] 纯文本查看 复制代码
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import os
import sys
import time
from scapy.all import ARP, Ether, srp
import concurrent.futures
import socket
 
# 定义要扫描的网段
subnet = "192.168.118.0/24"
# 定义要扫描的端口范围
port_range = [135,445,3306,3389,6379,22]
# 定义线程池大小
thread_pool_size = 200
     
 
def scan_port(ip, port):
    #print("正在扫描" + str(ip) + "的端口:" + str(port) +"\n")
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(1)
        result = sock.connect_ex((ip, port))
        if result == 0:
            return str(ip) + ":" + str(port)
    except:
        pass
    finally:
        if sock:
            sock.close()
 
def scan_subnet(subnet):
    with concurrent.futures.ThreadPoolExecutor(max_workers=thread_pool_size) as executor:
        futures = [executor.submit(scan_port, ip, port) for ip in subnet for port in port_range]
        concurrent.futures.wait(futures)
 
        for future in concurrent.futures.as_completed(futures):
            port = future.result()
            if port is not None:
                print(port)
                 
def arpscan(subnet):
    arp_request = ARP(pdst=subnet)
    ether = Ether(dst="ff:ff:ff:ff:ff:ff")
    arp_request_broadcast = ether / arp_request
    answered_list = srp(arp_request_broadcast, timeout=1, verbose=False)[0]
    clients = []
    for packet in answered_list:
        ip = packet[1].psrc
        clients.append(ip)
    scan_subnet(clients)   
                 
if __name__ == "__main__":
    arpscan(subnet)
v

ARP做本网段存活检测确实快,但研究半天才发现ARP不支持跨网段,活检还得想办法用Ping,直到找到了https://blog.csdn.net/Small_Teenager/article/details/122123299
[Python] 纯文本查看 复制代码
001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# encoding:utf-8
import time
import struct
import socket
import select
import concurrent.futures
import ipaddress
 
  
#Ping程序代码来自[url]https://blog.csdn.net/Small_Teenager/article/details/122123299[/url]
# 定义要扫描的网段
subnet = "192.168.112.0/20"
# 定义线程池大小
thread_pool_size = 200
# 定义要扫描的端口范围
port_range = [135,445,3306,3389,6379,22]
 
def scan_port(ip, port):
    #print("正在扫描" + str(ip) + "的端口:" + str(port) +"\n")
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(1)
        result = sock.connect_ex((ip, port))
        if result == 0:
            return str(ip) + ":" + str(port)
    except:
        pass
    finally:
        if sock:
            sock.close()
 
def scan_subnet(subnet):
    with concurrent.futures.ThreadPoolExecutor(max_workers=thread_pool_size) as executor:
        futures = [executor.submit(scan_port, ip, port) for ip in subnet for port in port_range]
        concurrent.futures.wait(futures)
         
        for future in concurrent.futures.as_completed(futures):
            port = future.result()
            if port is not None:
                print(port)
  
def chesksum(data):
    n = len(data)
    m = n % 2
    sum = 0
    for i in range(0, n - m ,2):
        sum += (data[i]) + ((data[i+1]) << 8)#传入data以每两个字节(十六进制)通过ord转十进制,第一字节在低位,第二个字节在高位
    if m:
        sum += (data[-1])
    #将高于16位与低16位相加
    sum = (sum >> 16) + (sum & 0xffff)
    sum += (sum >> 16) #如果还有高于16位,将继续与低16位相加
    answer = ~sum & 0xffff
    #  主机字节序转网络字节序列(参考小端序转大端序)
    answer = answer >> 8 | (answer << 8 & 0xff00)
    return answer
  
def request_ping(data_type,data_code,data_checksum,data_ID,data_Sequence,payload_body):
    #  把字节打包成二进制数据
    icmp_packet = struct.pack('>BBHHH32s',data_type,data_code,data_checksum,data_ID,data_Sequence,payload_body)
    icmp_chesksum = chesksum(icmp_packet)  #获取校验和
    #  把校验和传入,再次打包
    icmp_packet = struct.pack('>BBHHH32s',data_type,data_code,icmp_chesksum,data_ID,data_Sequence,payload_body)
    return icmp_packet
  
  
def raw_socket(dst_addr,icmp_packet):
    '''
       连接套接字,并将数据发送到套接字
    '''
    #实例化一个socket对象,ipv4,原套接字,分配协议端口
    rawsocket = socket.socket(socket.AF_INET,socket.SOCK_RAW,socket.getprotobyname("icmp"))
    #记录当前请求时间
    send_request_ping_time = time.time()
    #发送数据到网络
    rawsocket.sendto(icmp_packet,(dst_addr,80))
    #返回数据
    return send_request_ping_time,rawsocket,dst_addr
  
  
def reply_ping(send_request_ping_time,rawsocket,data_Sequence,timeout = 2):
    while True:
        #开始时间
        started_select = time.time()
        #实例化select对象,可读rawsocket,可写为空,可执行为空,超时时间
        what_ready = select.select([rawsocket], [], [], timeout)
        #等待时间
        wait_for_time = (time.time() - started_select)
        #没有返回可读的内容,判断超时
        if what_ready[0] == []:  # Timeout
            return -1
        #记录接收时间
        time_received = time.time()
        #设置接收的包的字节为1024
        received_packet, addr = rawsocket.recvfrom(1024)
        #获取接收包的icmp头
        #print(icmpHeader)
        icmpHeader = received_packet[20:28]
        #反转编码
        type, code, checksum, packet_id, sequence = struct.unpack(
            ">BBHHH", icmpHeader
        )
  
        if type == 0 and sequence == data_Sequence:
            return time_received - send_request_ping_time
  
        #数据包的超时时间判断
        timeout = timeout - wait_for_time
        if timeout <= 0:
            return -1
 
def ping(host):
    #TODO icmp数据包的构建
    data_type = 8 # ICMP Echo Request
    data_code = 0 # must be zero
    data_checksum = 0 # "...with value 0 substituted for this field..."
    data_ID = 0 #Identifier
    data_Sequence = 1 #Sequence number
    payload_body = b'abcdefghijklmnopqrstuvwabcdefghi' #data
  
    # 将主机名转ipv4地址格式,返回以ipv4地址格式的字符串,如果主机名称是ipv4地址,则它将保持不变
    #dst_addr = socket.gethostbyname(host)
    #print("正在 Ping {0} [{1}] 具有 32 字节的数据:".format(host,dst_addr))
    #请求ping数据包的二进制转换
    icmp_packet = request_ping(data_type,data_code,data_checksum,data_ID,data_Sequence,payload_body)
    #连接套接字,并将数据发送到套接字
    send_request_ping_time,rawsocket,addr = raw_socket(host,icmp_packet)
    #数据包传输时间
    times = reply_ping(send_request_ping_time,rawsocket,data_Sequence)
    if times > 0:
        #print("来自 {0} 的回复: 字节=32 时间={1}ms".format(addr,int(times*1000)))
        return host
    else:
        #print("请求超时。")
        pass
 
def StartPing(subnet):
    # 将网段转换为IP地址列表
    ips = [str(ip) for ip in list(ipaddress.IPv4Network(subnet).hosts())]
    print(ips)
    # 创建线程池
    with concurrent.futures.ThreadPoolExecutor(max_workers=thread_pool_size) as executor:
        # 对于每个IP地址和端口,提交扫描任务到线程池
        futures = [executor.submit(ping, ip) for ip in ips]
        # 等待所有扫描任务完成
        concurrent.futures.wait(futures)
 
        # 打印开放的端口号
        iplist = []
        for future in concurrent.futures.as_completed(futures):
            ip = future.result()
            if ip is not None:
                iplist.append(ip)
        scan_subnet(iplist)
                 
if __name__ == "__main__":
    StartPing(subnet)

利用Python实现Ping后,资源占用明显降低,扫描一个端口不如不做活检直接扫描,但扫描多个端口速度优势就体现出来了。

免费评分

参与人数 10吾爱币 +18 热心值 +9 收起 理由
jonepengcn + 1 + 1 我很赞同!
jekooma + 1 + 1 谢谢@Thanks!
刘甲乙 + 2 + 1 热心回复!
skywalker0123 + 2 + 1 我很赞同!
苏紫方璇 + 7 + 1 欢迎分析讨论交流,吾爱破解论坛有你更精彩!
jihedian + 1 鼓励转贴优秀软件安全工具和文档!
aka1Andrew + 1 + 1 热心回复!
嘚瑟挨顿揍 + 1 + 1 谢谢@Thanks!
jlzoe + 1 + 1 谢谢@Thanks!
vnightray + 1 + 1 谢谢@Thanks!

查看全部评分

本帖被以下淘专辑推荐:

发帖前要善用论坛搜索功能,那里可能会有你要找的答案或者已经有人发布过相同内容了,请勿重复发帖。

 楼主| xiaomingtt 发表于 2023-3-29 12:14
wukequdai 发表于 2023-3-29 11:13
nmap 现成的  不要重复造轮子

现成程序确实很多,但我正在学习pyrhon,总要拿一些东西来练手。而且现成的东西不一定完全满足我自己的需求,比如我以前用的fscan,必须把扫描结果保存到文件,再读取文件中的扫描结果,我希望扫描出来一条就马上自动处理一条,而不是全部结果出来以后再去处理。你认为这是轮子,但这是我进步的阶梯。
dcsyzx123 发表于 2023-3-29 10:41
jkl5322203 发表于 2023-3-29 10:34
其实还可以升级成扫描外网IP的端口。内网外网都可以扫,指定IP开放的端口也可以扫就完美了。我有这种工具,但不是PY写的。我感觉PY写会更好
anson1599 发表于 2023-3-29 10:38
那如果机器做了防PING,能检测吗?
whw0623 发表于 2023-3-29 10:44
可以学习nmap的源码
Easonll 发表于 2023-3-29 10:51
能否升级端口,内外网都能扫呢
liulaolao 发表于 2023-3-29 11:01
感谢分享。
wukequdai 发表于 2023-3-29 11:13
nmap 现成的  不要重复造轮子
zheng8542 发表于 2023-3-29 11:13
防火墙如果防PING是不是就不能扫描端口了?
wangL0 发表于 2023-3-29 11:59
感谢分享
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

返回列表

RSS订阅|小黑屋|处罚记录|联系我们|吾爱破解 - LCG - LSG ( 京ICP备16042023号 | 京公网安备 11010502030087号 )

GMT+8, 2025-5-21 18:41

Powered by Discuz!

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表