用Python写网络爬虫 第二版

时间:2023-03-09 07:29:33
用Python写网络爬虫 第二版

书籍介绍

书名:用 Python 写网络爬虫(第2版)
内容简介:本书包括网络爬虫的定义以及如何爬取网站,如何使用几种库从网页中抽取数据,如何通过缓存结果避免重复下载的问题,如何通过并行下载来加速数据抓取,如何利用不同的方式从动态网站中抽取数据,如何使用输入及导航等表达进行搜索和登录,如何访问被验证码图像保护的数据,如何使用 Scrapy 爬虫框架进行快速的并行抓取,以及使用 Portia 的 Web 界面构建网路爬虫。

背景调研

检查robots.txt

大多数的网站都会定义robots.txt文件,这样可以让爬虫了解爬取该网站时存在哪些限制。这些限制虽然是仅仅作为建议给出,但是良好的网络公民都应当遵守这些限制。

更多信息参见:https://www.robotstxt.org

示例:

访问http://example.python-scraping.com/robots.txt获取如下内容:

# section 1
User-agent: BadCrawler
Disallow: / # section 2
User-agent: *
Disallow: /trap
Crawl-delay: 5 # section 3
Sitemap: http://example.python-scraping.com/sitemap.xml

在section1中,robots.txt文件禁止用户代理未BadCcrawler的爬虫爬取该网站,不过这种写法可能无法起到应有的作用,因为恶意爬虫根本不会遵从robots.txt的要求。

section2规定,无论使用哪种用户代理,都应该在两次下载请求之间给出5秒的抓取延迟,我们需要遵从建议以免服务器过载。这里还有一个/trap链接,用于封禁那些爬取了不允许访问的链接的恶意爬虫。如果你访问了这个链接,服务器就会封禁你的IP一分钟!一个真实的网站可能会对你的IP封禁更长时间,甚至是永久封禁。

section3定义了一个Sitemap文件(即网站地图)。

检查网站地图

网站提供的Sitemap文件(即网站地图)可以帮助爬虫定位网站最新的内容,而无需爬取每一个网页,如果想要了解更多信息,可以从https://www.sitemaps.org/protocol.html获取网站地图的标准定义。许多网站发布平台都有自动生成网站地图的能力。下面是robots.txt文件中定位到的Sitemap文件的内容:

<?xml version="1.0" encoding="UTF-8"?>
<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">
<url><loc>http://example.python-scraping.com/places/default/view/Afghanistan-1</loc></url>
<url><loc>http://example.python-scraping.com/places/default/view/Aland-Islands-2</loc></url>
<url><loc>http://example.python-scraping.com/places/default/view/Albania-3</loc></url>
...
</urlset>

网站地图提供了所有网页的链接

编写第一个网络爬虫

下载网页

import urllib.request
from urllib.error import URLError, HTTPError, ContentTooShortError def download(url):
print('Downloading:', url)
try:
html = urllib.request.urlopen(url).read()
except (URLError, HTTPError, ContentTooShortError) as e:
print('Download error:', e.reason)
html = None
return html

下载重试

下面代码保证download函数在发送5xx错误时重新下载,可以尝试下载 http://httpstat.us/500 ,该网址会始终返回500错误码。

import urllib.request
from urllib.error import URLError, HTTPError, ContentTooShortError def download(url, num_retries=2):
print('Downloading:', url)
try:
html = urllib.request.urlopen(url).read()
except (URLError, HTTPError, ContentTooShortError) as e:
print('Download error:', e.reason)
html = None
if num_retries > 0:
if hasattr(e, 'code') and 500 <= e.code < 600:
# recursively retry 5xx HTTP errors
return download(url, num_retries - 1)
return html

设置用户代理user-agent

默认情况下,urllib使用Python-urllib/3.x作为用户代理下载网页内容,其中3.x是环境当前所用的Python的版本号。也许是因为曾经历过质量不佳的Python网络爬虫造成的服务器过载,一些网站还会封禁这个默认代理。

为了使下载网站更加可靠,我们需要控制用户代理的设定。下面的代码对download这个函数进行了参数化,设定了一个默认的用户代理‘wswp’(即Web Scraping With Python的首字母缩写)

import urllib.request
from urllib.error import URLError, HTTPError, ContentTooShortError def download(url, num_retries=2, user_agent='wswp'):
print('Downloading:', url)
request = urllib.request.Request(url)
request.add_header('User-agent', user_agent)
try:
html = urllib.request.urlopen(request).read()
except (URLError, HTTPError, ContentTooShortError) as e:
print('Download error:', e.reason)
html = None
if num_retries > 0:
if hasattr(e, 'code') and 500 <= e.code < 600:
# recursively retry 5xx HTTP errors
return download(url, num_retries=num_retries - 1)
return html

网站地图爬虫

import urllib.request
import re from urllib.error import URLError, HTTPError, ContentTooShortError def download(url, num_retries=2, user_agent='wswp', charset='utf-8'):
print('Downloading:', url)
request = urllib.request.Request(url)
request.add_header('User-agent', user_agent)
try:
resp = urllib.request.urlopen(request)
cs = resp.headers.get_content_charset()
if not cs:
cs = charset
html = resp.read().decode(cs)
except (URLError, HTTPError, ContentTooShortError) as e:
print('Download error:', e.reason)
html = None
if num_retries > 0:
if hasattr(e, 'code') and 500 <= e.code < 600:
# recursively retry 5xx HTTP errors
return download(url, num_retries=num_retries - 1)
return html def crawl_sitemap(url):
# download the sitemap file
sitemap = download(url)
# extract the sitemap links
links = re.findall('<loc>(.*?)</loc>', sitemap)
# download each link
for link in links:
html = download(link)
# scrape html here

ID遍历爬虫

下面代码对ID进行遍历,直到出现下载错误时停止。

import urllib.request
from urllib.error import URLError, HTTPError, ContentTooShortError
import itertools def download(url, num_retries=2):
print('Downloading:', url)
try:
html = urllib.request.urlopen(url).read()
except (URLError, HTTPError, ContentTooShortError) as e:
print('Download error', e.reason)
html = None
if num_retries > 0:
if hasattr(e, 'code') and 500 <= e.code <= 500:
return download(url, num_retries - 1)
return html def crawl_site(url):
for page in itertools.count(1):
pg_url = '{0}{1}'.format(url, page)
html = download(pg_url)
if html is None:
break

上面实现方式有一个缺陷就是,某个记录可能被删除,数据库ID之间并不是连续的,此时只要访问某个间隔点,爬虫就会立即退出。

下面代码对此进行改进,该版本连续发生多次下载错误后才会退出程序

import itertools
import urllib.request
from urllib.error import URLError, HTTPError, ContentTooShortError def download(url, num_retries=2, user_agent='wswp', charset='utf-8'):
print('Downloading:', url)
request = urllib.request.Request(url)
request.add_header('User-agent', user_agent)
try:
resp = urllib.request.urlopen(request)
cs = resp.headers.get_content_charset()
if not cs:
cs = charset
html = resp.read().decode(cs)
except (URLError, HTTPError, ContentTooShortError) as e:
print('Download error:', e.reason)
html = None
if num_retries > 0:
if hasattr(e, 'code') and 500 <= e.code < 600:
# recursively retry 5xx HTTP errors
return download(url, num_retries - 1)
return html def crawl_site(url, max_errors=5):
num_errors = 0
for page in itertools.count(1):
pg_url = '{}{}'.format(url, page)
html = download(pg_url)
if html is None:
num_errors += 1
if num_errors == max_errors:
# reached max number of errors, so exit
break
else:
num_errors = 0
# success - can scrape the result

链接爬虫

下面代码完成下载链接、将相对链接转为绝对链接、去重功能

import re
import urllib.request
from urllib.parse import urljoin
from urllib.error import URLError, HTTPError, ContentTooShortError def download(url, num_retries=2, user_agent='wswp', charset='utf-8'):
print('Downloading:', url)
request = urllib.request.Request(url)
request.add_header('User-agent', user_agent)
try:
resp = urllib.request.urlopen(request)
cs = resp.headers.get_content_charset()
if not cs:
cs = charset
html = resp.read().decode(cs)
except (URLError, HTTPError, ContentTooShortError) as e:
print('Download error:', e.reason)
html = None
if num_retries > 0:
if hasattr(e, 'code') and 500 <= e.code < 600:
# recursively retry 5xx HTTP errors
return download(url, num_retries - 1)
return html def link_crawler(start_url, link_regex):
" Crawl from the given start URL following links matched by link_regex "
crawl_queue = [start_url]
# keep track which URL's have seen before
seen = set(crawl_queue)
while crawl_queue:
url = crawl_queue.pop()
html = download(url)
if not html:
continue
# filter for links matching our regular expression
for link in get_links(html):
if re.match(link_regex, link):
abs_link = urljoin(start_url, link)
if abs_link not in seen:
seen.add(abs_link)
crawl_queue.append(abs_link) def get_links(html):
" Return a list of links from html "
# a regular expression to extract all links from the webpage
webpage_regex = re.compile("""<a[^>]+href=["'](.*?)["']""", re.IGNORECASE)
# list of all links from the webpage
return webpage_regex.findall(html)

解析robots.txt

首先,我们需要解析robots.txt 文件,以避免下载禁止爬取的URL,使用Python的urllib库中的robotparser模块,就可以轻松完成这项工作,如下面的代码所示:

from urllib import robotparser

rp = robotparser.RobotFileParser()
rp.set_url('http://example.python-scraping.com/robots.txt')
rp.read()
url = 'http://example.python-scraping.com/robots.txt'
user_agent = 'BadCrawler'
print(rp.can_fetch(user_agent, url)) # False
user_agent = 'GoodCrawler'
print(rp.can_fetch(user_agent, url)) # True

为将robotparser集成到链接爬虫中,我们首先需要创建有个新函数用于返回robotparser对象。

from urllib import robotparser

def get_robots_parser(robots_url):
rp = robotparser.RobotFileParser()
rp.set_url(robots_url)
rp.read()
return rp

我们需要可靠的设置robots_url,此时我们可以通过向函数传递额外的关键字参数的方法实现这一目标,我们还可以设置一个默认值,防止用户没有传递该变量,此外还需要定义user_agent

def link_crawler(start_url, link_regex, robots_url=None, user_agent='wswp'):
...
if not robots_url:
robots_url = '{}/robots.txt'.format(start_url)
rp = get_robots_parser(robots_url) # 最后我们在crawl循环中添加解释器检查
...
while crawl_queue:
url = crawl_queue.pop()
if rp.can_fetch(user_agent, url):
html = download(url, use=user_agent)
...
else:
print('Blocked by robots.txt:', url)

支持代理

下面是使用urllib只存储代理的代码

proxy = 'http://myproxy.net:1234'
proxy_support = urllib.request.ProxyHandler({'http':proxy})
opener = urllib.request.build_opener(proxy_support)
urllib.request.install_opener(opener)

下面是集成了该功能的新版本的download函数

import urllib.request
from urllib.error import URLError, HTTPError, ContentTooShortError def download(url, user_agent='wswp', num_retries=2, charset='utf-8', proxy=None):
print('Downloading:', url)
request = urllib.request.Request(url)
request.add_header('User-agent', user_agent) try:
if proxy:
proxy = 'http://myproxy.net:1234'
proxy_support = urllib.request.ProxyHandler({'http': proxy})
opener = urllib.request.build_opener(proxy_support)
urllib.request.install_opener(opener)
resp = urllib.request.urlopen(request)
cs = resp.headers.get_content_charset()
if not cs:
cs = charset
html = resp.read().decode(cs)
except (URLError, HTTPError, ContentTooShortError) as e:
print('Download error', e.reason)
html = None
if num_retries > 0:
if hasattr(e, 'code') and 500 <= e.code <= 500:
return download(url, user_agent=user_agent, num_retries=num_retries, charset=charset, proxy=charset)
return html

目前,默认情况下(python3.5),urllib模块不支持https代理。

下载限速

如果我们爬取网站的速度过快,就会面临被封禁或是造成服务器过载的风险。为了降低这些风险,我们可以在两次下载之间添加一组延时,从而对爬虫限速。下面是实现了该功能的类的代码。

from urllib.parse import urlparse
import time class Throttle:
""" Add a delay between downloads to the same domain
"""
def __init__(self, delay):
# amount of delay between downloads for each domain
self.delay = delay
# timestamp of when a domain was last accessed
self.domains = {} def wait(self, url):
domain = urlparse(url).netloc
last_accessed = self.domains.get(domain) if self.delay > 0 and last_accessed is not None:
sleep_secs = self.delay - (time.time() - last_accessed)
if sleep_secs > 0:
# domain has been accessed recently
# so need to sleep
time.sleep(sleep_secs)
# update the last accessed time
self.domains[domain] = time.time()

Throttle类记录了每个域名上次访问的时间,如果当前时间距离上次访问时间小于指定延时,则执行睡眠操作。我们可以在每次下载之前调用throttle对爬虫进行限速。

throttle = Throttle(delay)
throttle.wait(url)
html = download(url, user_agent=user_agent, num_retries=num_retries, charset=charset, proxy=charset)

避免爬虫陷阱

目前,我们的爬虫会跟踪所有之前没有访问过的链接。但是,一些网站会动态生成页面内容,这样就会出现无限多的页面。比如,网站有一个在线日历功能,提供了可以访问下个月和下一年的链接,那么下个月的页面中同样会包含访问再下个月的链接,这样就会一直持续请求到部件设定的最大时间(可能会是很久之后的时间)。该站点可能还会在简单的分页导航中提供相同的功能,本质上是 分页请求不断访问空的搜索结果页,直至达到最大页数。这种情况被称为爬虫陷阱。

想要避免陷入爬虫陷阱,一个简单的方法是记录到达当前网页经过了多少个链接,也就是深度。当达到最大深度时,爬虫就不再向队列中添加该网页中的链接了,想要实现最大深度的功能,我们需要修改seen变量,该变量原先只记录了访问过的网页链接,现在修改为一个字典,添加已发现链接的深度记录。

def link_crawler(..., max_depth=4):
seen = {}
...
if rp.can_fetch(user_agent, url):
depth = seen.get(url, 0)
if depth == max_depth:
print('Skipping %s due to depth' % url)
continnue
...
for link in get_links(html):
if re.match(link_regex, link):
abs_link = urljoin(start_url, link)
if abs_link not in seen:
seen[abs_link] = depth + 1
crawl_queue.append(abs_link)

有了该功能之后,我们就有信心爬虫最终一定能够完成了。如果想要禁用该功能,只需要将max_depth设为一个负数即可,此时,当前深度永远不会与之相等。

完整版代码

import re
import time
import urllib.request
from urllib import robotparser
from urllib.parse import urljoin,urlparse
from urllib.error import URLError, HTTPError, ContentTooShortError def download(url, num_retries=2, user_agent='wswp', charset='utf-8', proxy=None):
""" Download a given URL and return the page content
args:
url (str): URL
kwargs:
user_agent (str): user agent (default: wswp)
charset (str): charset if website does not include one in headers
proxy (str): proxy url, ex 'http://IP' (default: None)
num_retries (int): number of retries if a 5xx error is seen (default: 2)
"""
print('Downloading:', url)
request = urllib.request.Request(url)
request.add_header('User-agent', user_agent)
try:
if proxy:
proxy_support = urllib.request.ProxyHandler({'http': proxy})
opener = urllib.request.build_opener(proxy_support)
urllib.request.install_opener(opener)
resp = urllib.request.urlopen(request)
cs = resp.headers.get_content_charset()
if not cs:
cs = charset
html = resp.read().decode(cs)
except (URLError, HTTPError, ContentTooShortError) as e:
print('Download error:', e.reason)
html = None
if num_retries > 0:
if hasattr(e, 'code') and 500 <= e.code < 600:
# recursively retry 5xx HTTP errors
return download(url, num_retries=num_retries - 1)
return html def get_robots_parser(robots_url):
" Return the robots parser object using the robots_url "
rp = robotparser.RobotFileParser()
rp.set_url(robots_url)
rp.read()
return rp def get_links(html):
" Return a list of links (using simple regex matching) from the html content "
# a regular expression to extract all links from the webpage
webpage_regex = re.compile("""<a[^>]+href=["'](.*?)["']""", re.IGNORECASE)
# list of all links from the webpage
return webpage_regex.findall(html) class Throttle:
""" Add a delay between downloads to the same domain
"""
def __init__(self, delay):
# amount of delay between downloads for each domain
self.delay = delay
# timestamp of when a domain was last accessed
self.domains = {} def wait(self, url):
domain = urlparse(url).netloc
last_accessed = self.domains.get(domain) if self.delay > 0 and last_accessed is not None:
sleep_secs = self.delay - (time.time() - last_accessed)
if sleep_secs > 0:
# domain has been accessed recently
# so need to sleep
time.sleep(sleep_secs)
# update the last accessed time
self.domains[domain] = time.time() def link_crawler(start_url, link_regex, robots_url=None, user_agent='wswp',
proxy=None, delay=3, max_depth=4):
""" Crawl from the given start URL following links matched by link_regex. In the current
implementation, we do not actually scrapy any information. args:
start_url (str): web site to start crawl
link_regex (str): regex to match for links
kwargs:
robots_url (str): url of the site's robots.txt (default: start_url + /robots.txt)
user_agent (str): user agent (default: wswp)
proxy (str): proxy url, ex 'http://IP' (default: None)
delay (int): seconds to throttle between requests to one domain (default: 3)
max_depth (int): maximum crawl depth (to avoid traps) (default: 4)
"""
crawl_queue = [start_url]
# keep track which URL's have seen before
seen = {}
if not robots_url:
robots_url = '{}/robots.txt'.format(start_url)
rp = get_robots_parser(robots_url)
throttle = Throttle(delay)
while crawl_queue:
url = crawl_queue.pop()
# check url passes robots.txt restrictions
if rp.can_fetch(user_agent, url):
depth = seen.get(url, 0)
if depth == max_depth:
print('Skipping %s due to depth' % url)
continue
throttle.wait(url)
html = download(url, user_agent=user_agent, proxy=proxy)
if not html:
continue
# TODO: add actual data scraping here
# filter for links matching our regular expression
for link in get_links(html):
if re.match(link_regex, link):
abs_link = urljoin(start_url, link)
if abs_link not in seen:
seen[abs_link] = depth + 1
crawl_queue.append(abs_link)
else:
print('Blocked by robots.txt:', url)

requests版本:

import re
import time
import requests
from urllib import robotparser
from urllib.parse import urljoin,urlparse class Throttle:
""" Add a delay between downloads to the same domain
"""
def __init__(self, delay):
# amount of delay between downloads for each domain
self.delay = delay
# timestamp of when a domain was last accessed
self.domains = {} def wait(self, url):
domain = urlparse(url).netloc
last_accessed = self.domains.get(domain) if self.delay > 0 and last_accessed is not None:
sleep_secs = self.delay - (time.time() - last_accessed)
if sleep_secs > 0:
# domain has been accessed recently
# so need to sleep
time.sleep(sleep_secs)
# update the last accessed time
self.domains[domain] = time.time() def download(url, num_retries=2, user_agent='wswp', proxies=None):
""" Download a given URL and return the page content
args:
url (str): URL
kwargs:
user_agent (str): user agent (default: wswp)
proxies (dict): proxy dict w/ keys 'http' and 'https', values
are strs (i.e. 'http(s)://IP') (default: None)
num_retries (int): # of retries if a 5xx error is seen (default: 2)
"""
print('Downloading:', url)
headers = {'User-Agent': user_agent}
try:
resp = requests.get(url, headers=headers, proxies=proxies)
html = resp.text
if resp.status_code >= 400:
print('Download error:', resp.text)
html = None
if num_retries and 500 <= resp.status_code < 600:
# recursively retry 5xx HTTP errors
return download(url, num_retries=num_retries - 1)
except requests.exceptions.RequestException as e:
print('Download error:', e)
html = None
return html def get_robots_parser(robots_url):
" Return the robots parser object using the robots_url "
rp = robotparser.RobotFileParser()
rp.set_url(robots_url)
rp.read()
return rp def get_links(html):
""" Return a list of links (using simple regex matching)
from the html content """
# a regular expression to extract all links from the webpage
webpage_regex = re.compile("""<a[^>]+href=["'](.*?)["']""", re.IGNORECASE)
# list of all links from the webpage
return webpage_regex.findall(html) def link_crawler(start_url, link_regex, robots_url=None, user_agent='wswp',
proxies=None, delay=3, max_depth=4):
""" Crawl from the given start URL following links matched by link_regex.
In the current implementation, we do not actually scrape any information. args:
start_url (str): web site to start crawl
link_regex (str): regex to match for links
kwargs:
robots_url (str): url of the site's robots.txt
(default: start_url + /robots.txt)
user_agent (str): user agent (default: wswp)
proxies (dict): proxy dict w/ keys 'http' and 'https', values
are strs (i.e. 'http(s)://IP') (default: None)
delay (int): seconds to throttle between requests
to one domain (default: 3)
max_depth (int): maximum crawl depth (to avoid traps) (default: 4)
"""
crawl_queue = [start_url]
# keep track which URL's have seen before
seen = {}
if not robots_url:
robots_url = '{}/robots.txt'.format(start_url)
rp = get_robots_parser(robots_url)
throttle = Throttle(delay)
while crawl_queue:
url = crawl_queue.pop()
# check url passes robots.txt restrictions
if rp.can_fetch(user_agent, url):
depth = seen.get(url, 0)
if depth == max_depth:
print('Skipping %s due to depth' % url)
continue
throttle.wait(url)
html = download(url, user_agent=user_agent, proxies=proxies)
if not html:
continue
# TODO: add actual data scraping here
# filter for links matching our regular expression
for link in get_links(html):
if re.match(link_regex, link):
abs_link = urljoin(start_url, link)
if abs_link not in seen:
seen[abs_link] = depth + 1
crawl_queue.append(abs_link)
else:
print('Blocked by robots.txt:', url)

数据抓取

上面已经学习了如何构建一个爬虫来下载网页,现在,我们要让这个爬虫从每个网页中抽取一些数据,然后实现某些事情,这种做法也称为抓取(scraping)。

正则表达式

官方文档:https://docs.python.org/3/howto/regex.html

Beautiful Soup

中文文档:https://beautifulsoup.readthedocs.io/zh_CN/latest/

安装命令:

pip install beautifulsoup4

安装html5lib解析器

pip install html5lib

使用了html5lib的BeautifulSoup能够正确解析缺失的属性引号以及闭合标签,使其成为完整的HTML文档

Lxml

LXML是基于libxml2这一XML解析库构建的Python库,它使用C语言编写,解析速度比BeautifulSoup更快。