『Python』 爬取 WooYun 论坛所有漏洞条目的相关信息

时间:2021-11-24 23:42:02

每个漏洞条目包含:

乌云ID,漏洞标题,漏洞所属厂商,白帽子,漏洞类型,厂商或平台给的Rank值

主要是做数据分析使用:
可以分析某厂商的各类型漏洞的统计;
或者对白帽子的能力进行分析.....

数据更新时间:2016/5/27
漏洞条目:104796条

数据截图如下:

『Python』 爬取 WooYun 论坛所有漏洞条目的相关信息

数据网盘链接:

链接:http://pan.baidu.com/s/1bpDNKOv 密码:6y57

爬虫脚本:

# coding:utf-8
# author: anka9080
# version: 1.0 py3 import sys,re,time,socket
from requests import get
from queue import Queue, Empty
from threading import Thread # 全局变量
COUNT = 1
START_URL = 'http://wooyun.org/bugs'
ID_DETAILS = []
ALL_ID = []
Failed_ID = []
PROXIES = [] HEADERS = {
"Accept": "text/html,application/xhtml+xml,application/xml,application/json;q=0.9,image/webp,*/*;q=0.8",
"Accept-Encoding": "gzip, deflate, sdch",
"Accept-Language": "zh-CN,zh;q=0.8",
"Cache-Control": "max-age=0",
"Connection": "keep-alive",
"DNT": "1",
"Host": "wooyun.org",
"Upgrade-Insecure-Requests": "1",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2716.0 Safari/537.36"
} class WooYunSpider(Thread):
"""docstring for WooYunSpider"""
def __init__(self,queue):
Thread.__init__(self)
self.pattern1 = re.compile(r'title>(.*?)\| WooYun.*?keywords" content="(.*?),(.*?),(.*?),wooyun',re.S) # 匹配模式在 compile 的时候指定
self.pattern2 = re.compile(r"漏洞Rank:(\d{1,3})")
self.queue = queue
self.start() # 执行 run() def run(self):
"每次读取 queue 的一条"
global COUNT,RES_LOG,ERR_LOG
while(1):
try:
id = self.queue.get(block = False)
r = get('http://wooyun.org/bugs/' + id,headers = HEADERS)
html = r.text
except Empty:
break
except Exception as e:
msg = '[ - Socket_Excpt ] 链接被拒绝,再次添加到队列:' + id
print(msg)
ERR_LOG.write(msg+'\n')
self.queue.put(id) # 访问失败则把这个 URL从新加入队列
else:
title,comp,author,bug_type,rank = self.get_detail(html,id)
detail = id+'----'+title+'----'+comp+'----'+author+'----'+bug_type+'----'+rank
try: # 写入文件可能会诱发 gbk 编码异常,这里保存 id 到 failed
RES_LOG.write(detail + '\n')
except Exception as e:
Failed_ID.append(id)
msg = '[ - Encode_Excpt ] 字符编码异常:' + id
print(msg)
ERR_LOG.write(msg+'\n')
ID_DETAILS.append(detail)
# time.sleep(1) print('[ - info ] id: {} count: {} time: {:.2f}s'.format(id,COUNT,time.time() - start))
COUNT += 1 # 由 缺陷编号 获得对应的 厂商 和 漏洞类型信息
def get_detail(self,html,id):
global ERR_LOG
try:
# print(html)
res = self.pattern1.search(html)
title = res.group(1).strip()
comp = res.group(2).strip()
author = res.group(3).strip()
bug_type = res.group(4).strip()
except Exception as e:
msg = '[ - Detail_Excpt ] 未解析出 标题等相关信息:' + id
print(msg)
ERR_LOG.write(msg+'\n')
Failed_ID.append(id)
title,comp,author,bug_type,rank = 'Null','Null','Null','Null','Null'
else:
try:
res2 = self.pattern2.search(html) # 若厂商暂无回应则 rank 为 Null
rank = res2.group(1).strip()
except Exception as e:
msg = '[ - Rank_Excpt ] 未解析出 Rank:' + id
print(msg)
ERR_LOG.write(msg+'\n')
rank = 'Null' finally:
try:
print (title,comp,author,bug_type,rank)
except Exception as e:
msg = '[ - Print_Excpt ] 字符编码异常:' + id +'::'+ str(e)
print(msg)
ERR_LOG.write(msg+'\n')
return title,comp,author,bug_type,rank class ThreadPool(object):
def __init__(self,thread_num,id_file):
self.queue = Queue() # 需要执行的队列
self.threads = [] # 多线程列表
self.add_task(id_file)
self.init_threads(thread_num) def add_task(self,id_file):
with open(id_file) as input:
for id in input.readlines():
self.queue.put(id.strip()) def init_threads(self,thread_num):
for i in range(thread_num):
print ('[ - info :] loading threading ---> ',i)
# time.sleep(1)
self.threads.append(WooYunSpider(self.queue)) # threads 列表装的是 爬虫线程 def wait(self):
for t in self.threads:
if t.isAlive():
t.join() def test():
url = 'http://wooyun.org/bugs/wooyun-2016-0177647'
r = get(url,headers = HEADERS)
html = r.text
# print type(html)
# keywords" content="(.*?),(.*?),(.*?),wooyun ====> 厂商,白帽子,类型
pattern1 = re.compile(r'title>(.*?)\| WooYun')
pattern2 = re.compile(r'keywords" content="(.*?),(.*?),(.*?),wooyun')
pattern3 = re.compile(r'漏洞Rank:(\d{1,3})')
for x in range(500):
res = pattern1.search(html)
# print (res.group(1))
res = pattern2.search(html)
# print (res.group(1),res.group(2),res.group(3))
res = pattern3.search(html)
# print (res.group(1))
x += 1
print(x)
# rank = res.group(4).strip() # print html def test2():
url = 'http://wooyun.org/bugs/wooyun-2016-0177647'
r = get(url,headers = HEADERS)
html = r.text
pattern = re.compile(r'title>(.*?)\| WooYun.*?keywords" content="(.*?),(.*?),(.*?),wooyun.*?漏洞Rank:(\d{1,3})',re.S)
for x in range(500):
res = pattern.search(html)
# print (res.group(1),res.group(2),res.group(3),res.group(4),res.group(5))
x += 1
print(x)
# 保存结果
def save2file(filename,filename_failed_id):
with open(filename,'w') as output:
for item in ID_DETAILS:
try: # 写入文件可能会诱发 gbk 编码异常,这里忽略
output.write(item + '\n')
except Exception as e:
pass with open(filename_failed_id,'w') as output:
output.write('\n'.join(Failed_ID)) if __name__ == '__main__': socket.setdefaulttimeout(1)
start = time.time() # test() # 日志记录
ERR_LOG = open('err_log.txt','w')
RES_LOG = open('res_log.txt','w')
id_file = 'id_0526.txt'
# id_file = 'id_test.txt'
tp = ThreadPool(20,id_file)
tp.wait() save2file('id_details.txt','failed_id.txt') end = time.time()
print ('[ - info ] cost time :{:.2f}s'.format(end - start))