scrapy爬取知乎问答

时间:2023-03-09 13:09:37
scrapy爬取知乎问答

登陆

参考 https://github.com/zkqiang/Zhihu-Login

# -*- coding: utf-8 -*-
import scrapy import time
import re
import base64
import hmac
import hashlib
import json
import matplotlib.pyplot as plt
from PIL import Image class ZhihuSpider(scrapy.Spider):
name = 'zhihu'
allowed_domains = ['www.zhihu.com']
start_urls = ['http://www.zhihu.com/'] login_url = 'https://www.zhihu.com/signup'
login_api = 'https://www.zhihu.com/api/v3/oauth/sign_in'
login_data = {
'client_id': 'c3cef7c66a1843f8b3a9e6a1e3160e20',
'grant_type': 'password',
'source': 'com.zhihu.web',
'username': "+86xxxxxx",
'password': "xxxxxxxx",
# 传入'cn'是倒立汉字验证码,
'lang': 'en',
'ref_source': 'homepage'
}
headers = {
'Connection': 'keep-alive',
'Host': 'www.zhihu.com',
'Referer': 'https://www.zhihu.com/',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/69.0.3497.100 Safari/537.36'
} def start_requests(self):
if self.login_data["lang"] == 'cn':
api = 'https://www.zhihu.com/api/v3/oauth/captcha?lang=cn'
else:
api = 'https://www.zhihu.com/api/v3/oauth/captcha?lang=en'
yield scrapy.Request(url=api, headers=self.headers, callback=self._is_need_captcha) def _is_need_captcha(self, response):
show_captcha = re.search(r'true', response.text) if show_captcha:
yield scrapy.Request(url=response.url,
headers=self.headers,
method="PUT",
callback=self._get_captcha)
else:
timestamp = str(int(time.time() * 1000))
self.login_data.update({
'captcha': "",
'timestamp': timestamp,
'signature': self._get_signature(timestamp)
})
yield scrapy.FormRequest(
url=self.login_api,
formdata=self.login_data,
headers=self.headers,
callback=self.check_login
) def _get_captcha(self, response):
json_data = json.loads(response.text)
img_base64 = json_data['img_base64'].replace(r'\n', '')
with open('./captcha.jpg', 'wb') as f:
f.write(base64.b64decode(img_base64))
img = Image.open('./captcha.jpg')
if self.login_data["lang"] == 'cn':
plt.imshow(img)
print('点击所有倒立的汉字,按回车提交')
points = plt.ginput(7)
capt = json.dumps({'img_size': [200, 44],
'input_points': [[i[0] / 2, i[1] / 2] for i in points]})
else:
img.show()
capt = input('请输入图片里的验证码:')
# 这里必须先把参数 POST 验证码接口
yield scrapy.FormRequest(url=response.url,
formdata={'input_text': capt},
headers=self.headers,
callback=self.captcha_login,
meta={"captcha":capt}
) def captcha_login(self, response):
timestamp = str(int(time.time() * 1000))
self.login_data.update({
'captcha': response.meta['captcha'],
'timestamp': timestamp,
'signature': self._get_signature(timestamp)
}) yield scrapy.FormRequest(
url=self.login_api,
formdata=self.login_data,
headers=self.headers,
callback=self.check_login
) def check_login(self, response):
yield scrapy.Request(
url=self.login_url,
headers=self.headers,
callback=self.parse
) def _get_signature(self, timestamp):
"""
通过 Hmac 算法计算返回签名
实际是几个固定字符串加时间戳
:param timestamp: 时间戳
:return: 签名
"""
ha = hmac.new(b'd1b964811afb40118a12068ff74a12f4', digestmod=hashlib.sha1)
grant_type = self.login_data['grant_type']
client_id = self.login_data['client_id']
source = self.login_data['source']
ha.update(bytes((grant_type + client_id + source + timestamp), 'utf-8'))
return ha.hexdigest() def parse(self, response):
print(response.text)

数据库设计

DROP TABLE IF EXISTS `zhihu_question`;
CREATE TABLE `zhihu_question` (
`zhuhu_id` bigint(20) NOT NULL,
`topics` varchar(255) DEFAULT NULL,
`url` varchar(300) NOT NULL,
`title` varchar(255) NOT NULL,
`content` longtext NOT NULL,
`create_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
`answer_num` int(11) NOT NULL DEFAULT '0',
`comments_num` int(11) NOT NULL DEFAULT '0',
`watch_user_num` int(11) NOT NULL DEFAULT '0',
`click_num` int(11) NOT NULL DEFAULT '0',
`crawl_time` datetime NOT NULL,
`crawl_update_time` datetime DEFAULT NULL,
PRIMARY KEY (`zhuhu_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
DROP TABLE IF EXISTS `zhihu_answer`;
CREATE TABLE `zhihu_answer` (
`zhihu_id` bigint(20) NOT NULL,
`url` varchar(255) NOT NULL,
`question_id` bigint(20) NOT NULL,
`author_id` varchar(100) DEFAULT NULL,
`content` longtext NOT NULL,
`praise_num` int(11) NOT NULL DEFAULT '0',
`comments_num` int(11) NOT NULL DEFAULT '0',
`create_time` datetime NOT NULL,
`update_time` datetime NOT NULL,
`crawl_time` datetime NOT NULL,
`crawl_update_time` datetime DEFAULT NULL,
PRIMARY KEY (`zhihu_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

页面解析

    def parse(self, response):
"""
提取出html页面中的所有url 并跟踪这些url进行一步爬取
如果提取的url中格式为 /question/xxx 就下载之后直接进入解析函数
"""
all_urls = response.css("a::attr(href)").extract()
all_urls = [urljoin(response.url, url) for url in all_urls]
all_urls = filter(lambda x: True if x.startswith("https") else False, all_urls)
for url in all_urls:
match_obj = re.match("(.*zhihu.com/question/(\d+))(/|$).*", url)
if match_obj:
# 如果提取到question相关的页面则下载后交由提取函数进行提取
request_url = match_obj.group(1)
yield scrapy.Request(request_url, headers=self.headers, callback=self.parse_question)
else:
# 如果不是question页面则直接进一步跟踪
yield scrapy.Request(url, headers=self.headers, callback=self.parse) def parse_question(self, response):
match_obj = re.match("(.*zhihu.com/question/(\d+))(/|$).*", response.url)
if match_obj:
question_id = int(match_obj.group(2)) item_loader = ItemLoader(item=ZhihuQuestionItem(), response=response)
item_loader.add_css("title", "h1.QuestionHeader-title::text")
item_loader.add_css("content", ".QuestionHeader-detail")
item_loader.add_value("url", response.url)
item_loader.add_value("zhihu_id", question_id)
item_loader.add_css("answer_num", ".List-headerText span::text")
item_loader.add_css("comments_num", ".QuestionHeader-actions button::text")
item_loader.add_css("watch_user_num", ".NumberBoard-value::text")
item_loader.add_css("topics", ".QuestionHeader-topics .Popover div::text") question_item = item_loader.load_item() yield scrapy.Request(self.start_answer_url.format(question_id, 20, 0), headers=self.headers, callback=self.parse_answer)
yield question_item def parse_answer(self, response):
#处理question的answer
ans_json = json.loads(response.text)
is_end = ans_json["paging"]["is_end"]
next_url = ans_json["paging"]["next"] #提取answer的具体字段
for answer in ans_json["data"]:
answer_item = ZhihuAnswerItem()
answer_item["zhihu_id"] = answer["id"]
answer_item["url"] = answer["url"]
answer_item["question_id"] = answer["question"]["id"]
answer_item["author_id"] = answer["author"]["id"] if "id" in answer["author"] else None
answer_item["content"] = answer["content"] if "content" in answer else None
answer_item["parise_num"] = answer["voteup_count"]
answer_item["comments_num"] = answer["comment_count"]
answer_item["create_time"] = answer["created_time"]
answer_item["update_time"] = answer["updated_time"]
answer_item["crawl_time"] = datetime.datetime.now() yield answer_item if not is_end:
yield scrapy.Request(next_url, headers=self.headers, callback=self.parse_answer)

items

class ZhihuQuestionItem(scrapy.Item):
#知乎的问题 item
zhihu_id = scrapy.Field()
topics = scrapy.Field()
url = scrapy.Field()
title = scrapy.Field()
content = scrapy.Field()
answer_num = scrapy.Field()
comments_num = scrapy.Field()
watch_user_num = scrapy.Field()
click_num = scrapy.Field()
crawl_time = scrapy.Field() def get_insert_sql(self):
#插入知乎question表的sql语句
insert_sql = """
insert into zhihu_question(zhihu_id, topics, url, title, content, answer_num, comments_num,
watch_user_num, click_num, crawl_time
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE content=VALUES(content), answer_num=VALUES(answer_num), comments_num=VALUES(comments_num),
watch_user_num=VALUES(watch_user_num), click_num=VALUES(click_num)
"""
zhihu_id = self["zhihu_id"][0]
topics = ",".join(self["topics"])
url = self["url"][0]
title = "".join(self["title"])
content = "".join(self["content"])
answer_num = extract_num("".join(self["answer_num"]))
comments_num = extract_num("".join(self["comments_num"])) if len(self["watch_user_num"]) == 2:
watch_user_num = int(self["watch_user_num"][0])
click_num = int(self["watch_user_num"][1])
else:
watch_user_num = int(self["watch_user_num"][0])
click_num = 0 crawl_time = datetime.datetime.now().strftime(SQL_DATETIME_FORMAT) params = (zhihu_id, topics, url, title, content, answer_num, comments_num,
watch_user_num, click_num, crawl_time) return insert_sql, params class ZhihuAnswerItem(scrapy.Item):
#知乎的问题回答item
zhihu_id = scrapy.Field()
url = scrapy.Field()
question_id = scrapy.Field()
author_id = scrapy.Field()
content = scrapy.Field()
parise_num = scrapy.Field()
comments_num = scrapy.Field()
create_time = scrapy.Field()
update_time = scrapy.Field()
crawl_time = scrapy.Field() def get_insert_sql(self):
#插入知乎question表的sql语句
insert_sql = """
insert into zhihu_answer(zhihu_id, url, question_id, author_id, content, parise_num, comments_num,
create_time, update_time, crawl_time
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE content=VALUES(content), comments_num=VALUES(comments_num), parise_num=VALUES(parise_num),
update_time=VALUES(update_time)
""" create_time = datetime.datetime.fromtimestamp(self["create_time"]).strftime(SQL_DATETIME_FORMAT)
update_time = datetime.datetime.fromtimestamp(self["update_time"]).strftime(SQL_DATETIME_FORMAT)
params = (
self["zhihu_id"], self["url"], self["question_id"],
self["author_id"], self["content"], self["parise_num"],
self["comments_num"], create_time, update_time,
self["crawl_time"].strftime(SQL_DATETIME_FORMAT),
) return insert_sql, params

pipelines

    def do_insert(self, cursor, item):
# 执行具体的插入
# 根据不同的item 构建不同的sql语句并插入到mysql中
insert_sql, params = item.get_insert_sql()
cursor.execute(insert_sql, params)