利用 PySpark 和 Snowflake 自动化大规模网站质量评分
大规模网站特征工程:PySpark、Python与Snowflake 为了在新的业务合作计划中找到最佳合作伙伴,你需要从包含数千个来自多个国家商人的数据库中,评估每个网站的专业性、内容深度、导航性和可见产品列表等方面,生成一个网站质量评分(0-10分)。通过将这个分数集成到机器学习管道中,可以显著提高模型选择高质量商人的准确性。 技术实现 项目文件结构 项目源代码可以从GitHub仓库 https://github.com/lucasbraga461/feat-eng-websites/ 克隆下来。主要文件包括: src:源代码目录 helpers/snowflake_data_fetch.py:从Snowflake中获取数据的帮助函数 p1_fetch_html_from_websites.py:从网站获取HTML内容的脚本 process_data:处理数据的SQL脚本 s1_gather_initial_table.sql:创建初始表格的数据聚合脚本 s2_create_table_with_website_feature.sql:基于网站特征创建新表的脚本 notebooks:Jupyter Notebook ps_website_quality_score.ipynb:计算网站质量评分的示例代码 data:数据目录 websites_initial_table.csv:初始表格的CSV文件 README.md:项目说明文件 requirements.txt:项目依赖项 venv:虚拟环境目录 .gitignore:忽略文件列表 .env:环境变量配置文件 准备数据集 理想情况下,数据应存储在Snowflake中。假设数据来自多个国家的数据表,可以使用 s1_gather_initial_table.sql 脚本将数据聚合到一个表中。例如: sql CREATE OR REPLACE TABLE DATABASE.SCHEMA.WEBSITES_INITIAL_TABLE AS ( SELECT DISTINCT COUNTRY, WEBSITE_URL FROM DATABASE.SCHEMA.COUNTRY_ARG_DATASET WHERE WEBSITE_URL IS NOT NULL ) UNION ALL ( SELECT DISTINCT COUNTRY, WEBSITE_URL FROM DATABASE.SCHEMA.COUNTRY_BRA_DATASET WHERE WEBSITE_URL IS NOT NULL ) UNION ALL ( SELECT DISTINCT COUNTRY, WEBSITE_URL FROM DATABASE.SCHEMA.COUNTRY_JAM_DATASET WHERE WEBSITE_URL IS NOT NULL ); 获取HTML内容 准备好数据后,你可以使用 p1_fetch_html_from_websites.py 脚本从Snowflake或CSV文件中获取HTML内容。对于Snowflake数据,运行命令: bash cd ~/Document/GitHub/feat-eng-websites python3 src/p1_fetch_html_from_websites.py -c BRA --use_snowflake 这将打开浏览器窗口,要求你认证Snowflake账户。认证成功后,脚本会从指定表中拉取数据并获取网站内容。如果使用CSV文件,则去掉 --use_snowflake 标志: bash cd ~/Document/GitHub/feat-eng-websites python3 src/p1_fetch_html_from_websites.py -c BRA 该脚本的优势在于使用异步I/O(asyncio + aiohttp)并发请求多个网站,绕过基本的机器人检测,并通过批量处理和重试机制来避免内存溢出和网络问题。 处理HTML内容 由于单个页面的HTML内容可能非常大,因此在大规模场景下,使用平面文件存储和处理是不切实际的。此时,我们通过Snowflake的Snowpark引擎将数据传递给Spark进行可扩展的特征提取。示例代码位于 notebooks/ps_website_quality_score.ipynb,可以直接在Snowflake的Jupyter Notebook环境中运行。 首先,创建一个持久存储区域 @STAGE_WEBSITES,用于上传UDF包及其依赖项(如BeautifulSoup和lxml)。然后,定义一个UDF extract_features_udf,从HTML内容中提取关键信息,如关键词、链接、图片和脚本数量,以及是否有价格列表。例如: ```python @udf(name="extract_features_udf", is_permanent=True, replace=True, stage_location="@STAGE_WEBSITES", packages=["beautifulsoup4", "lxml"]) def extract_features(html_content: str, CONTACT_KEYWORDS: str, ABOUT_KEYWORDS: str, PRICE_PATTERNS: list) -> dict: if not html_content: return {"word_count": 0, "title_length": 0, "has_contact_page": 0, "has_about_page": 0, "num_links": 0, "num_images": 0, "num_scripts": 0, "has_price_listings": 0} try: soup = BeautifulSoup(html_content, 'lxml') text = soup.get_text(" ", strip=True) word_count = len(text.split()) title = soup.title.string.strip() if soup.title and soup.title.string else "" has_contact = bool(re.search(CONTACT_KEYWORDS, text, re.I)) has_about = bool(re.search(ABOUT_KEYWORDS, text, re.I)) num_links = len(soup.find_all("a")) num_images = len(soup.find_all("img")) num_scripts = len(soup.find_all("script")) has_price = any(re.search(pattern, text, re.I) for pattern in PRICE_PATTERNS) return {"word_count": word_count, "title_length": len(title), "has_contact_page": int(has_contact), "has_about_page": int(has_about), "num_links": num_links, "num_images": num_images, "num_scripts": num_scripts, "has_price_listings": int(has_price)} except Exception: return {"word_count": 0, "title_length": 0, "has_contact_page": 0, "has_about_page": 0, "num_links": 0, "num_images": 0, "num_scripts": 0, "has_price_listings": 0} ``` 接下来,应用UDF提取每行HTML内容的特征,并将其转换为DataFrame中的独立列。最后,根据预定义的业务规则(如词数阈值、联系页和关于我们页的存在、价格列表等),计算每个网站的质量评分,并将结果显示的表保存回Snowflake中。 法律和伦理考虑 在大规模爬取网站内容时,务必遵守各国家和地区的法律法规,尊重网站的robots.txt文件,避免对目标网站造成不必要的负担。此外,确保数据的安全性和隐私保护,不在未经允许的情况下泄露敏感信息。 行业评价与公司背景 这套网站特征工程方案被广泛认为是高效且可靠的。通过使用Snowflake和PySpark,项目不仅提高了数据处理速度和准确性,还增强了模型的选择能力。这使得企业在筛选合作伙伴时更加精准,从而推动更好的商业决策。该方法尤其适用于需要在全球范围内处理大量网站的大型企业。项目作者Lucas Braga是一位经验丰富的数据工程师,致力于开发可扩展的数据处理解决方案。