python asyncio爬虫示例
# -*- coding:utf-8 -*-
import asyncio
import aiohttp
import ssl
import certifi
from lxml import etree
import re
import platform
from html import unescape
import os
import aiofiles
if 'Windows' in platform.platform():
polic = asyncio.WindowsSelectorEventLoopPolicy()
asyncio.set_event_loop_policy(polic)
class Zuowenku():
def __init__(self,folder):
self.folder = folder
self.queue = None
self.encoding = 'utf-8'
self.seens = set()
# 最大并发数
self.max_workers = 50
# 记录当前正在工作的任务数量
self.workes = 0
self.invalid_char = re.compile(r'["\\/|<>?*]')
self.headers = {
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:98.0) Gecko/20100101 Firefox/98.0',
}
self.host = 'https://www.zuowenku.net/'
self.detail_re = re.compile(r'https://www.zuowenku.net/(\d+).shtml',flags=re.I)
# 检查保存的文件夹是否存在
def check_folder(self):
curdir = os.path.dirname(os.path.abspath(__file__))
folder_path = os.path.join(curdir,self.folder)
if not os.path.exists(folder_path):
os.mkdir(folder_path)
return folder_path
async def fetch(self,session:aiohttp.ClientSession,url):
try:
async with session.get(url) as reps:
status = reps.status
html = await reps.text(self.encoding)
return html,status
except aiohttp.ClientError:
return None,0
def extract_urls(self,souce):
doc = etree.HTML(souce)
# 提取网页里面的所有链接
all_links = set(doc.xpath('//a/@href'))
# 遍历所有链接,过滤不符合条件的链接
for link in all_links:
# 对于不是host开头的链接进行过滤
if not link.startswith(self.host):
continue
# 如果放进队列就过滤掉
if link in self.seens:
continue
# 把链接放入到队列
self.queue.put_nowait(link)
# 在这里添加可以避免队列里面存放太多的重复链接
self.seens.add(link)
@staticmethod
def parse_detail(souce):
doc = etree.HTML(souce)
title = ''.join(doc.xpath('//h1/text()'))
artile_items = doc.xpath('//div[@id="zwdetail"]/div[@style="clear:both"]')
articles = []
for item in artile_items:
articles.append(unescape(etree.tounicode(item)))
articles = ''.join(articles)
return title,articles
async def save_local(self,title,content):
# 对标题进行特殊字符过滤
title = self.invalid_char.sub('',title) + '.txt'
async with aiofiles.open(title,'w',encoding=self.encoding) as fd:
await fd.write(content)
await fd.close()
async def process(self,session,url):
print(f'正在处理:{url}')
# 所有页面都会经历这个流程
html, status = await self.fetch(session, url)
if status != 200:
if status ==0:
# 对于网络异常的状态码重放回队列
self.queue.put_nowait(url)
print(f'状态码错误:{status},{url}')
# 完成一次流程,任务数减一
self.workes -= 1
return
self.extract_urls(html)
# 以下两步只有详情页才会经历
if self.detail_re.match(url):
title, content = self.parse_detail(html)
if not title or not content:
print(f'获取不到标题或内容{url}')
else:
await self.save_local(title, content)
# 完成一次流程,任务数减一
self.workes -= 1
async def main(self):
#检查保存路径
folder_path = self.check_folder()
# 切换工作目录到要保存的目录下
os.chdir(folder_path)
# 初始化信息
self.queue = asyncio.Queue()
# 把 host 放到队列里面去
self.queue.put_nowait(self.host)
ssl_context = ssl.create_default_context()
ssl_context.load_verify_locations(certifi.where())
connector = aiohttp.TCPConnector(ssl=ssl_context)
timeout = aiohttp.ClientTimeout(total=20)
async with aiohttp.ClientSession(connector=connector,timeout=timeout,headers=self.headers) as session:
while True:
url = await self.queue.get()
if self.workes >= self.max_workers:
# 当创建的任务数量,大于目标任务数量,就需要等待一下
# 让cpu 有时间处理采集和下载
await asyncio.sleep(0.5)
asyncio.create_task(self.process(session,url))
# 每创建一个任务 任务数 + 1
self.workes += 1
# await session.close()
def start(self):
asyncio.run(self.main())
if __name__ == '__main__':
zwk = Zuowenku('articles')
zwk.start()