爬取易语言资源网的整站ZIP附件

精易的资源网,超过数量需要登陆扣分。所有找了一个免费的。
目标:e5a5x.COM 的资源网。
运行环境:python

图片[1]|爬取易语言资源网的整站ZIP附件|不死鸟资源网
图片[2]|爬取易语言资源网的整站ZIP附件|不死鸟资源网

不做分类处理,只下载整站附件,异步下载。送给有需要的人。

import asyncio
import aiohttp
import os
import re
import logging
from urllib.parse import urljoin
from bs4 import BeautifulSoup

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('downloader.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

# 全局配置
BASE_URL = "https://www.e5a5x.com"
HEADERS = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
}
DOWNLOAD_DIR = "loads"
os.makedirs(DOWNLOAD_DIR, exist_ok=True)

# 分类配置(已根据您的需求设置完整分页)
CATEGORIES = {
    "web3": {
        "base_url": f"{BASE_URL}/html/web3/",
        "sub_categories": ["B1", "B2", "B3", "B4"],
        "list_pattern": "list_45_{page}.html",
        "max_pages": 182  # 共182页
    },
    "web2": {
        "base_url": f"{BASE_URL}/html/web2/",
        "sub_categories": ["B5", "B6", "B7", "B8"],
        "list_pattern": "list_46_{page}.html",
        "max_pages": 176  # 共176页
    },
    "web4": {
        "base_url": f"{BASE_URL}/html/web4/",
        "sub_categories": ["A1", "A2", "A3", "A4"],
        "list_pattern": "list_44_{page}.html",
        "max_pages": 225  # 共225页
    },
    "lib": {
        "base_url": f"{BASE_URL}/html/lib/",
        "sub_categories": [],
        "list_pattern": "list_42_{page}.html",
        "max_pages": 20  # 共20页
    },
    "soft": {
        "base_url": f"{BASE_URL}/html/soft/",
        "sub_categories": [],
        "list_pattern": None,
        "max_pages": 0
    }
}

async def fetch(session, url, retries=3):
    for i in range(retries):
        try:
            async with session.get(url, headers=HEADERS) as response:
                response.raise_for_status()
                return await response.text()
        except Exception as e:
            if i == retries - 1:
                logger.error(f"请求失败 {url}: {str(e)}")
                return None
            await asyncio.sleep(2 ** i)

async def download_file(session, url, title):
    try:
        # 生成安全的文件名
        safe_title = re.sub(r'[\\/*?:"<>|]', '_', title)[:100]
        file_ext = os.path.splitext(url)[1] or '.zip'
        filename = f"{safe_title}{file_ext}"
        save_path = os.path.join(DOWNLOAD_DIR, filename)
        
        # 检查文件是否已存在
        if os.path.exists(save_path):
            logger.info(f"文件已存在,跳过下载: {filename}")
            return
        
        logger.info(f"开始下载: {filename}")
        async with session.get(url, headers=HEADERS) as response:
            response.raise_for_status()
            total_size = int(response.headers.get('content-length', 0))
            downloaded = 0
            
            with open(save_path, 'wb') as f:
                async for chunk in response.content.iter_chunked(1024 * 1024):  # 1MB chunks
                    f.write(chunk)
                    downloaded += len(chunk)
                    if total_size > 0:
                        logger.info(f"进度: {downloaded}/{total_size} bytes ({downloaded/total_size:.1%})")
            
        logger.info(f"下载完成: {filename} ({os.path.getsize(save_path)/1024:.2f} KB)")
    except Exception as e:
        logger.error(f"下载失败 {url}: {str(e)}")

def parse_download_url(html):
    soup = BeautifulSoup(html, 'html.parser')
    
    # 方法1:检查所有a标签的href属性
    for a in soup.find_all('a', href=True):
        href = a['href']
        if href.lower().endswith(('.zip', '.rar', '.7z', '.exe', '.apk')):
            return urljoin(BASE_URL, href)
    
    # 方法2:检查JavaScript中的下载链接
    patterns = [
        r"(?:window\.location\.href|window\.open|var\s+url)\s*=\s*['\"](.+?\.(?:zip|rar|7z|exe))['\"]",
        r"downloadG1\(\)\s*{[^}]+['\"](.+?\.(?:zip|rar|7z|exe))['\"]",
        r"Click_down\(\)\s*{[^}]+['\"](.+?\.(?:zip|rar|7z|exe))['\"]"
    ]
    for pattern in patterns:
        matches = re.finditer(pattern, html, re.DOTALL)
        for match in matches:
            url = match.group(1).strip()
            return urljoin(BASE_URL, url)
    
    return None

async def process_detail_page(session, url):
    html = await fetch(session, url)
    if not html:
        return
    
    soup = BeautifulSoup(html, 'html.parser')
    title = soup.title.text.strip() if soup.title else "无标题"
    download_url = parse_download_url(html)
    
    if download_url:
        await download_file(session, download_url, title)
    else:
        logger.warning(f"未找到下载地址: {url}")

async def crawl_list_page(session, list_url):
    try:
        logger.info(f"正在爬取列表页: {list_url}")
        html = await fetch(session, list_url)
        if not html:
            return
        
        soup = BeautifulSoup(html, 'html.parser')
        detail_urls = []
        
        # 查找所有详情页链接
        for a in soup.select('a[href^="/html/"]'):
            href = a['href']
            if not href.startswith(('/html/article_', '/html/feedback_')):
                full_url = urljoin(BASE_URL, href)
                detail_urls.append(full_url)
        
        logger.info(f"找到 {len(detail_urls)} 个详情页链接")
        
        # 限制并发量
        semaphore = asyncio.Semaphore(5)
        async def limited_task(url):
            async with semaphore:
                await process_detail_page(session, url)
                await asyncio.sleep(1)  # 请求间隔
        
        await asyncio.gather(*[limited_task(url) for url in detail_urls])
    except Exception as e:
        logger.error(f"处理列表页出错 {list_url}: {str(e)}")

async def crawl_category(session, category_name, category_config):
    try:
        logger.info(f"开始爬取分类: {category_name} (共{category_config['max_pages']}页)")
        
        # 处理主分类页面
        await crawl_list_page(session, category_config["base_url"])
        
        # 处理子分类
        for sub in category_config["sub_categories"]:
            sub_url = urljoin(category_config["base_url"], sub + "/")
            await crawl_list_page(session, sub_url)
            
        # 处理分页(如果有分页模式)
        if category_config["list_pattern"] and category_config["max_pages"] > 0:
            for page in range(1, category_config["max_pages"] + 1):
                list_url = urljoin(
                    category_config["base_url"],
                    category_config["list_pattern"].format(page=page)
                )
                try:
                    await crawl_list_page(session, list_url)
                except Exception as e:
                    logger.error(f"爬取分页失败 {list_url}: {str(e)}")
                    continue
                
                # 每10页显示一次进度
                if page % 10 == 0:
                    logger.info(f"{category_name} 进度: {page}/{category_config['max_pages']}页")
        
        logger.info(f"完成爬取分类: {category_name}")
    except Exception as e:
        logger.error(f"爬取分类 {category_name} 出错: {str(e)}")

async def main():
    connector = aiohttp.TCPConnector(limit=10)
    timeout = aiohttp.ClientTimeout(total=3600)  # 1小时超时
    
    async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
        tasks = []
        for category_name, config in CATEGORIES.items():
            task = asyncio.create_task(crawl_category(session, category_name, config))
            tasks.append(task)
        
        await asyncio.gather(*tasks)

if __name__ == "__main__":
    logger.info("启动爬虫...")
    logger.info(f"下载文件将保存到: {os.path.abspath(DOWNLOAD_DIR)}")
    
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        logger.info("用户中断,停止爬取")
    except Exception as e:
        logger.error(f"爬虫运行出错: {str(e)}")
    finally:
        logger.info("爬取任务结束")

安装python依赖环境:pip  install  aiohttp  beautifulsoup4

本站资源均为作者提供和网友推荐收集整理而来,仅供学习和研究使用,请在下载后24小时内删除,谢谢合作!
爬取易语言资源网的整站ZIP附件|不死鸟资源网
爬取易语言资源网的整站ZIP附件
此内容为免费阅读,请登录后查看
¥0
限时特惠
¥99
文章采用CC BY-NC-SA 4.0许可协议授权
免费阅读
THE END
点赞14 分享