当前位置:首页 > 日记本 > 正文内容

python asyncio爬虫示例

zhangchap2年前 (2022-04-04)日记本218
# -*- coding:utf-8 -*-
import asyncio
import aiohttp
import ssl
import certifi
from lxml import etree
import re
import platform
from html import unescape
import os
import aiofiles

if 'Windows' in platform.platform():
    polic = asyncio.WindowsSelectorEventLoopPolicy()
    asyncio.set_event_loop_policy(polic)

class Zuowenku():
    def __init__(self,folder):
        self.folder = folder
        self.queue = None
        self.encoding = 'utf-8'
        self.seens = set()
        # 最大并发数
        self.max_workers = 50
        # 记录当前正在工作的任务数量
        self.workes = 0
        self.invalid_char = re.compile(r'["\\/|<>?*]')
        self.headers = {
            'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:98.0) Gecko/20100101 Firefox/98.0',
        }
        self.host = 'https://www.zuowenku.net/'
        self.detail_re = re.compile(r'https://www.zuowenku.net/(\d+).shtml',flags=re.I)

    # 检查保存的文件夹是否存在
    def check_folder(self):
        curdir = os.path.dirname(os.path.abspath(__file__))
        folder_path = os.path.join(curdir,self.folder)

        if not os.path.exists(folder_path):
            os.mkdir(folder_path)
        return folder_path



    async def fetch(self,session:aiohttp.ClientSession,url):
        try:
            async with session.get(url) as reps:
                status = reps.status
                html = await reps.text(self.encoding)
                return html,status
        except aiohttp.ClientError:
            return None,0

    def extract_urls(self,souce):
        doc = etree.HTML(souce)
        # 提取网页里面的所有链接
        all_links = set(doc.xpath('//a/@href'))
        # 遍历所有链接,过滤不符合条件的链接
        for link in all_links:
            # 对于不是host开头的链接进行过滤
            if not link.startswith(self.host):
                continue
            # 如果放进队列就过滤掉
            if link in self.seens:
                continue
            # 把链接放入到队列
            self.queue.put_nowait(link)

            # 在这里添加可以避免队列里面存放太多的重复链接
            self.seens.add(link)

    @staticmethod
    def parse_detail(souce):
        doc = etree.HTML(souce)
        title = ''.join(doc.xpath('//h1/text()'))
        artile_items = doc.xpath('//div[@id="zwdetail"]/div[@style="clear:both"]')
        articles = []
        for item in artile_items:
            articles.append(unescape(etree.tounicode(item)))
        articles = ''.join(articles)
        return title,articles

    async def save_local(self,title,content):
        # 对标题进行特殊字符过滤
        title = self.invalid_char.sub('',title) + '.txt'
        async with aiofiles.open(title,'w',encoding=self.encoding) as fd:
            await fd.write(content)
            await fd.close()

    async def process(self,session,url):
        print(f'正在处理:{url}')
        # 所有页面都会经历这个流程
        html, status = await self.fetch(session, url)
        if status != 200:
            if status ==0:
                # 对于网络异常的状态码重放回队列
                self.queue.put_nowait(url)
            print(f'状态码错误:{status},{url}')
            # 完成一次流程,任务数减一
            self.workes -= 1
            return
        self.extract_urls(html)
        # 以下两步只有详情页才会经历
        if self.detail_re.match(url):
            title, content = self.parse_detail(html)
            if not title or not content:
                print(f'获取不到标题或内容{url}')

            else:
                await self.save_local(title, content)

        # 完成一次流程,任务数减一
        self.workes -= 1

    async def main(self):
        #检查保存路径
        folder_path = self.check_folder()
        # 切换工作目录到要保存的目录下
        os.chdir(folder_path)
        # 初始化信息
        self.queue = asyncio.Queue()
        # 把 host 放到队列里面去
        self.queue.put_nowait(self.host)
        ssl_context = ssl.create_default_context()
        ssl_context.load_verify_locations(certifi.where())
        connector = aiohttp.TCPConnector(ssl=ssl_context)
        timeout = aiohttp.ClientTimeout(total=20)
        async with aiohttp.ClientSession(connector=connector,timeout=timeout,headers=self.headers) as session:

            while True:
                url = await self.queue.get()
                if self.workes >= self.max_workers:
                    # 当创建的任务数量,大于目标任务数量,就需要等待一下
                    # 让cpu 有时间处理采集和下载
                    await asyncio.sleep(0.5)

                asyncio.create_task(self.process(session,url))
                # 每创建一个任务 任务数 + 1
                self.workes += 1


        # await session.close()

    def start(self):
        asyncio.run(self.main())




if __name__ == '__main__':
    zwk = Zuowenku('articles')
    zwk.start()


分享给朋友:

相关文章

Nginx+PHP,PHP如何优化配置?

具体修改FPM配置文件参数: 若你的php日志出现: WARNING: [pool www] seems busy (you may need to increase pm.sta...

python 随机生成时间戳写入txt文件/运行sql语句

import time from random import randint with open('time.txt', ...

python 发布文章 随机分类(choice)

from random import choice catid = choice([5,6]) #choice 函数从列表中随机提取...

python fake_useragent 模块用法

我们每次发送requests请求时通过random从中随机获取一个随机UserAgent,两行代码即可完成UserAgent的不停更换 from fake_useragent i...

减重五原则

1.主粮减半,少吃米面粥粉等;2.完全戒糖,包括可乐、奶茶等3.轻断食,每周一次,一日餐4.不限制任何动物脂肪和肉类,少吃植物油;5.保证随时喝到水,脂肪的消耗需要大量的水。...

如何为精简的 CSS 文件删除未使用的 CSS

如何为精简的 CSS 文件删除未使用的 CSS

精简的网站比臃肿的网站运行得更快,这已经不是什么秘密了。不要让不必要的 CSS 拖累您的 Web 项目;使用下面描述的工具和技术来帮助您删除未使用的 CSS 并提高您网站的整体性能。什么是未使用的 C...

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。