生产者与消费者+Queue(线程安全)

时间:2024-01-20 21:28:27
from queue import Queue
from lxml import etree
import requests
from urllib import request
from threading import Thread
import re, os class Producter(Thread): def __init__(self, page_queue, img_queue, *args, **kwargs):
super(Producter,self).__init__(*args, **kwargs)
self.page_queue = page_queue
self.img_queue = img_queue
self.head = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36' def run(self):
while True:
url = self.page_queue.get()
self.parse(url)
def parse(self, url):
res = requests.get(url, params=self.head)
text = res.text
html = etree.HTML(text)
imgs = html.xpath('//div[@class="col-xs-6 col-sm-3"]//img[@class!="gif"]')
print(imgs)
for img in imgs:
img_path = img.get('data-original')
alt = img.get('alt')
alt = re.sub(r'[\??.。!!*]', '', alt) # 将特殊符号替换
sub = os.path.splitext(img_path) # 获取文件后缀
sub = re.sub(r'[(\!dta)]', '', sub[1])
filename = './imgs/'+alt+sub
print(img_path, filename)
self.img_queue.put((img_path, filename)) class Consumer(Thread): def __init__(self, page_queue, img_queue, *args, **kwargs):
super(Consumer, self).__init__(*args, **kwargs)
self.page_queue = page_queue
self.img_queue = img_queue def run(self):
while True:
url = self.img_queue.get()
self.parse(url[0], url[1])
print('消费者')
def parse(self, url, path):
# 下载文件到指定位置
request.urlretrieve(url, path) def main():
page_queue = Queue(10)
img_queue = Queue(10000)
for i in range(1, 11):
uri = 'https://www.doutula.com/article/list/?page='+str(i)
page_queue.put(uri)
for i in range(5):
t1 = Producter(page_queue, img_queue)
t1.start()
for i in range(5):
t2 = Consumer(page_queue, img_queue)
t2.start() if __name__ == '__main__':
main()

注意:

  如果使用threading.Lock(),或者threading.Condition(),都是线程不安全的,它们都是锁,共同方法(lock.acquire(),lock.release()),只不过Condition()有多了几个方法,wait()、notify()、notify_all(),如果等待的情况下,使用wait()将不占用CPU,当用资源消耗时,notify唤醒等待的线程。Lock()一直占用CPU资源。感觉还是Queue好用是吧。