如何探测网络设备ACL规则

时间:2021-06-23 00:28:55

 

探测网络设备ACL规则

 

背景:在互联网企业的生产网络中,往往在网络入口处的网络设备上会有成千上万条ACL策略,这么多的ACL导致了网络管理员很难彻底梳理清楚其中的逻辑关系,从而不知道到底对外开放了哪些IP和哪些端口。

 

解决手段:编写ACL规则探测程序,从公网扫描该网络设备的ACL规则

 

工作原理:不管是交换机还是路由器或防火墙,在处理数据包时ACL规则总是优先于ICMP规则。即:当网络设备收到一个TTL为0的报文时会先匹配ACL规则之后再向发送者发送 ICMP time exceeded消息,基于此原理就可以在公网发送以IDC内地址为目的IP且TTL到被探测设备时刚好减为0的数据包,如果被探测设备返回了ICMP time exceeded消息则说明它的ACL策略针对此IP及port开放,如果没有返回包则说明数据包被它的ACL阻拦

 

图示:

如何探测网络设备ACL规则

程序实现语言:python3

 

源码:

 1 # coding:utf-8
 2 
 3 from itertools import groupby  4 from scapy.all import *
 5 import re  6 import sys  7 import IPy  8 
 9 
 10 class RangeException(Exception):  11     pass
 12 
 13 
 14 class InputType(Exception):  15     pass
 16 
 17 
 18 class TargetNotSupport(Exception):  19     pass
 20 
 21 
 22 class OptionError(Exception):  23     pass
 24 
 25 
 26 class PortScan(object):  27     def __init__(self, speed=3):  28         self.open_port = []  29         self.speed = speed  30 
 31     def __str__(self):  32         speed_statement = '使用PortScan(*)创建对象时可以在*处指定扫描速率,默认为3,数值越小扫描速度越快\n' \  33                           '注意:随着扫描速度的增加准确率会相应降低!'
 34         return speed_statement  35 
 36     # 从本地文件读取IP资源
 37     def __target(self):  38         try:  39             open_file = input('请输入要导入资源的文件名字:')  40             address_file = open(open_file, 'r')  41             address_list = []  42             for i in address_file.readlines():  43                 i = i.replace('\n', '')  44  address_list.append(i)  45         except FileNotFoundError:  46             print('\n')  47             print('请先在本地创建对应名字的IP列表文本文件!!!')  48             print('\n')  49  self.scan()  50 
 51         except KeyboardInterrupt:  52             print('')  53  sys.exit()  54 
 55         except Exception as error:  56             print('打开本地文件有误!!!')  57             print(error)  58  self.scan()  59         else:  60             return address_list  61 
 62     # 获取IP资源
 63     # 输入1从一个文件读取IP,输入2从屏幕输入获取IP
 64     # 获取的IP信息可以是单个IP地址(例:220.12.12.12),也可以是一个地址段(例:192.168.1.0/24)
 65     # 最终返回一个IP地址列表,此列表包含了输入的所有单个IP地址以及地址段中的可用IP
 66     def get_ip(self, option, string):  67 
 68         address_store = []  # IP资源存储
 69 
 70         # 如果选1则从文件读取IP资源
 71         if option == 1:  72             # 得到打开IP表文件名字及其IP表
 73             address_list = self.__target()  74 
 75         # 如果选2则手动输入IP资源
 76         if option == 2:  77             # 接收IP数据
 78             address_list = input(string)   # 1.1.1.1,2.2.2.0/24
 79             address_list = address_list.split(',')  80 
 81         # 1.1.1.1/24 的正则
 82         ip_range_re = r'( *(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.' \  83                       r'(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.' \  84                       r'(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.' \  85                       r'(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d))/' \  86                       r'(3[012]|[12][0-9]|[1-9]) *'
 87         # 1.1.1.1,2.2.2.2,3.3.3.3 的正则
 88         ip_address_re = r'( *(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.' \  89                         r'(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.' \  90                         r'(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.' \  91                         r'(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d))'
 92 
 93         # 对输入的值进行便利,提取其中的IP地址
 94         for i in address_list:  95             range_re_result = re.match(ip_range_re, i)    # 1.1.1.1/24的正则匹配结果
 96             ip_re_result = re.match(ip_address_re, i)    # 1.1.1.1,2.2.2.2,3.3.3.3 的正则匹配结果
 97             if range_re_result:  98                 subnet_mask = range_re_result.group(6)  99                 network_number = re.sub(r'^0+', '', range_re_result.group(1)) 100                 address_string = network_number + '/' + subnet_mask   # 如果输入1.1.1.1/24类型则address_store为字符串
101                 try: 102                     # 提取网段内所有可用IP地址并加表
103                     address_subset = IPy.IP(address_string) 104                     for i in address_subset: 105                         if i == address_subset[len(address_subset)-1] or i == address_subset[0]: 106                             continue
107                         else: 108  address_store.append(str(i)) 109                 except ValueError: 110                     print('输入有误,请按"网络号/掩码"或"IP地址"格式输入') 111  self.scan() 112 
113             elif ip_re_result: 114                 ii = re.sub(r'^0+', '', ip_re_result.group()) 115                 address_store.append(ii)      #对单个IP地址形式的输入直接加表
116             else: 117                 print('输入有误,请输入正确的IP地址(e.g:1.1.1.1,192.168.1.0/24)!!!') 118  self.get_ip(option) 119         return address_store 120 
121     # 通过从屏幕输入获取端口资源
122     # 输入形式可以为单个端口号(例:3389),也可以是一个端口范围(例:22-25)
123     # 返回数据为一个列表,其中每个元素都以元组形式存在. 每个元组包含两个整数元素,第一个为端口范围的最小值,第二个为端口范围的最大值
124     # 注意: 单个端口号形式的输入最后也将以范围形式输出,其最大值与最小值都为他本身
125     # 返回数据举例: [(22-25),(3389,3389)]
126     def get_port(self): 127         port_range = input('请输入要扫描端口范围(e.g: 3389,20-25):') 128         target_port = []  # 端口资源存储
129         try: 130             port_range = port_range.split(',')   # 例:['1', '2', '3-10', '11-20']
131             for i in port_range: 132                 if re.match(' *(\d+)-(\d+).*', i): 133                     # for ii in range(len(open_port_list)):
134                     low_port = int(re.match(' *(\d+)-(\d+).*', i).group(1)) 135                     high_port = int(re.match(' *(\d+)-(\d+).*', i).group(2)) 136                     if low_port >= high_port or low_port <= 0 or low_port > 65535 or high_port <= 0 or high_port > 65535: 137                         raise RangeException 138                     else: 139                         target_port.append((low_port, high_port))   # 如果是范围则把最小值和最大值以元组形式加表
140                 elif re.match(' *\d+ *', i): 141                     singular = int(re.match(' *(\d+) *', i).group(1)) 142                     if 0 < singular <= 65535: 143                         target_port.append((singular, singular))   # 如果是单整数则把它当作范围一样处理,最大值和最小值均为它自己
144                     else: 145                         raise RangeException 146                 else: 147                     raise InputType 148         except RangeException: 149             print('端口应为1-65535之间的整数,且输入范围格式应当为从小到大') 150  self.get_port() 151         except InputType: 152             print('端口类型应为整数') 153  self.get_port() 154         except KeyboardInterrupt: 155             print('') 156  sys.exit() 157         except Exception as unusual: 158             print('输入有误!') 159             print(unusual) 160  self.get_port() 161         return target_port  # 返回经过处理的目标端口列表
162 
163     # 对纯数字的列表进行排序且范围切块
164     # 例:导入[11,22,33,1,2,3,4,5]----->导出[1-5,11,22,33]
165  @staticmethod 166     def int_single_to_range(original): 167         original.sort()  # 先排序
168         open_port_range = [] 169         fun = lambda x: x[1] - x[0] 170         for k, g in groupby(enumerate(original), fun): 171             l1 = [j for i, j in g]  # 连续数字的列表
172             if len(l1) > 1: 173                 scop = str(min(l1)) + '-' + str(max(l1))  # 将连续数字范围用"-"连接
174             else: 175                 scop = l1[0] 176             open_port_range.append("{}".format(scop)) 177         return open_port_range 178 
179     # TTL自动检测
180     # 导入一个被探测设备IP列表,返回一个被探测设备IP与相应TTL的字典,例:{'220.2.2.2':15}
181     def ttl_check(self, address_list): 182         print('准备中...') 183         probe_device_ttl = {} 184         # switch = 0 # 检测返回数据包的源IP是否为被探测设备
185         try: 186             for i in address_list: 187                 for ii in range(1, 129): 188                     print(i, ii) 189                     scan_packet = IP(dst=i, ttl=ii) / TCP(dport=8080, flags='S') 190                     ttl_source = sr1(scan_packet, timeout=3, verbose=False) 191                     #while 1:
192                     # time.sleep(0.001)
193                     if ttl_source: 194                         try: 195                             if ttl_source['IP'].fields['src'] == i: 196                                 probe_device_ttl[i] = ii 197                                 # switch = 1
198                                 break
199                             else: 200                                 continue
201                         except Exception as receive_error: 202                             print(receive_error) 203                             raise
204                     # if switch == 1:
205                     # break
206                 else: 207                     print('TTL超时!!!') 208 
209         except KeyboardInterrupt: 210             print('') 211  sys.exit() 212 
213         except Exception as error: 214             print('程序出现错误!!!') 215             print(error) 216  self.scan() 217         else: 218             print('准备完毕') 219             return probe_device_ttl 220 
221  @staticmethod 222     def option(): 223         print('请选择导入被扫描信息方式:\n'
224               '1 从文件导入\n'
225               '2 在程序中手动输入\n') 226 
227     def scan(self): 228         # 功能选择
229  self.option() 230         try: 231             option = int(input('我选择: ')) 232             print(option) 233             if option != 2 and option != 1: 234                 raise OptionError 235         except OptionError: 236             print('请输入功能标号!') 237  self.scan() 238 
239         # 获取要扫描IP列表
240         address_store = self.get_ip(option, '请输入被探测IP资源:') 241 
242         # 获取要扫描的端口列表
243         port_range = self.get_port() 244 
245         probe_device = self.get_ip(2, '请输入被探测的安全设备IP地址:') 246 
247         # 自动检测到探测设备的TTL值,该值为一个字典,key为被探测安全设备IP,value为到该设备的TTL值
248         ttl = self.ttl_check(probe_device) 249 
250         count = 0  # 用作进度百分比的分子. 以每个IP的每个端口为单位进行计数,总数为IP个数*端口个数
251 
252         if ttl: 253             # 挨个儿朝被探测设备发送端口探测包
254             for probe_device_ip, ttl in ttl.items(): 255 
256                 print(probe_device_ip + '端口开放情况:') 257 
258                 # 创建一个新文件,准备导入结果
259                 write_file = open(probe_device_ip + '-result.txt', 'w') 260 
261                 try: 262                     # 为每个被探测设备计算IP资源池中所有的IP资源
263                     for i in address_store: 264 
265                         # 为每个IP计算各个输入IP端口范围开放情况
266                         for port in port_range: 267                             (low_port, high_port) = port 268                             scan_packets = IP(dst=i, ttl=ttl) / TCP(dport=(low_port, high_port), flags='S')  # 构造检测包
269                             replay_packets_total = sr(scan_packets, timeout=self.speed, verbose=False)  # 发送检测包及接收返回包
270                             open_port_list = replay_packets_total[0].res  # 开放端口原始对象列表(一个IP不同端口范围回包的集合)
271 
272                             # 一个IP有几个端口开放就有几个回包(如果端口被ACL干掉则不会回包),以下遍历回包来读取开放的端口
273                             for ii in range(len(open_port_list)): 274                                 try: 275                                     if open_port_list[ii][1]['ICMP'].fields['type'] == 11:  # ICMP类型为11时为TTL超时包
276                                         self.open_port.append(open_port_list[ii][0]['TCP'].fields['dport'])  # TTL超时则为开放端口,将开放端口进行加表
277                                         continue
278                                     else: 279                                         if open_port_list[ii][1]['ICMP'].fields['type'] == 3:  # 不知为啥有时候会返回类型为3的ICMP包(即:端口不可达包)
280                                             continue
281                                         else: 282                                             # 除11和3外其他类型的ICMP回包,需进行人工排查
283                                             print('ICMP返回类型不对') 284                                             print(open_port_list[ii][1]['ICMP'].fields) 285                                             print(open_port_list[ii][0]['TCP'].fields) 286                                 except IndexError: 287 
288                                     # 如果探测设备IP刚好为要扫描的IP时,开放端口会返回SYN,ACK包
289                                     if open_port_list[ii][1]['TCP'].fields['flags'] == 'SA': 290                                         self.open_port.append(open_port_list[ii][0]['TCP'].fields['dport']) 291                                         continue
292 
293                                     # 不知为啥有时候交换机会返回RST ACK的包
294                                     if open_port_list[ii][1]['TCP'].fields['flags'] == 'RA': 295                                         continue
296                                     else: 297                                         print('返回未知TCP包,需人工分析') 298                                         print(open_port_list[ii][1]['TCP'].fields, 299                                               open_port_list[ii][1]['TCP'].fields['flags']) 300                                         print(open_port_list[ii]) 301 
302                                 count += 1  # 执行进度+1(每计算完一个IP进度+1)
303                                 print(count) 304 
305                                 # 进度统计
306                                 speed_to_progress = count / len(address_list) * len(port_range) * len(ttl) * 100
307                                 print('\r已完成:%.2f%% ' % speed_to_progress, end='') 308 
309                         self.open_port = self.int_single_to_range(self.open_port)  # 对开放端口列表进行排序和范围化
310                         print('针对' + i + '开放端口: ', self.open_port) 311                         write_file.write(str(i) + ':' + str(self.open_port) + '\n')  # 每扫描完一个IP就把该IP结果写入文件
312                         self.open_port = []  # 扫尾工作,为下个IP扫描准备一个干净的开放端口列表
313 
314  write_file.close() 315 
316                 except KeyboardInterrupt: 317                     print('') 318  write_file.close() 319  sys.exit() 320                 except Exception as error: 321  write_file.close() 322                     print('程序异常退出!') 323                     print(error) 324                 else: 325  write_file.close() 326                     print('') 327                     if option == 1: 328                         print('被探测设备%s已完成,结果已导入当前路径''\'%s\'''文件中' % (probe_device_ip, probe_device_ip + '-result.txt')) 329                     if option == 2: 330                         print('扫描已完成!') 331 
332 
333 if __name__ == '__main__': 334 
335     def banner(): 336         print('\n') 337         print('============================================') 338         print('\n') 339         print('\n') 340         print(' ACL有效性探测系统v1.0 ') 341         print('\n') 342         print('\n') 343         print('============================================') 344         print('\n') 345 
346     def main(): 347  banner() 348         a = PortScan() 349  a.scan() 350 
351     main()