深入理解Python分布式爬虫原理
在分布式爬虫中,一个爬虫任务被分成多个子任务,分发给多个节点执行,最终合并结果。Python分布式爬虫框架Scrapy已经内置了分布式爬虫功能,但是对于特定的需求,我们可能需要自己实现分布式爬虫。
分布式爬虫的原理
分布式爬虫的实现主要依赖于队列和节点间的通信。
节点1从队列中获取爬虫任务,爬取数据后将结果存储到队列中。节点2从队列中获取数据,清洗和处理后再将结果存储到队列中,直到所有子任务完成。最后,节点3从队列中获取所有数据,合并结果并返回给调用方。
实现分布式爬虫的步骤
- 安装分布式框架Celery
pip install celery
- 定义任务
from celery import Celery
app = Celery('tasks', broker='amqp://guest@localhost//')
@app.task
def crawl(url):
...
return data
利用Celery框架,我们可以轻松地编写分布式爬虫。定义的任务函数需要使用装饰器@celery.task将该函数注册为可以在分布式环境下运行的任务。
- 启动工作节点
celery -A tasks worker --loglevel=info
该命令会启动一个工作节点,用于执行我们编写的任务函数。
- 发送任务到队列中
from tasks import crawl
urls = [
'https://www.example.com/1',
'https://www.example.com/2',
'https://www.example.com/3',
...
]
for url in urls:
crawl.delay(url)
将任务发送到队列中,任务参数url会被传递到crawl函数中。任务启动后会被工作节点获取并执行。在执行过程中,可以通过Celery提供的监控界面查看任务的执行情况。
示例1:爬取豆瓣电影Top250
1. 爬虫代码
from scrapy import Spider, Request
from scrapy.crawler import CrawlerProcess
class DoubanTopSpider(Spider):
name = 'douban_top'
allowed_domains = ['movie.douban.com']
start_urls = ['https://movie.douban.com/top250']
def parse(self, response):
yield from self.parse_movie(response)
is_last_page = '后页' not in response.css('.next a::text').getall()
if not is_last_page:
next_page_url = response.css('.next a::attr(href)').get()
yield Request(next_page_url, callback=self.parse)
def parse_movie(self, response):
for movie in response.css('.grid_view .item'):
yield {
'title': movie.css('.title::text').get(),
'info': movie.css('.bd p::text').get(),
'rating': movie.css('.rating_num::text').get(),
'cover': movie.css('img::attr(src)').get(),
}
process = CrawlerProcess({
'FEED_FORMAT': 'csv',
'FEED_URI': 'douban_top250.csv'
})
process.crawl(DoubanTopSpider)
process.start()
该爬虫爬取豆瓣电影Top250的电影名、简介、评分和海报,并将结果以csv格式保存到本地文件douban_top250.csv中。
2. 分布式爬虫代码
from scrapy import Spider, Request
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
from celery import Celery
class DoubanTopSpider(Spider):
name = 'douban_top'
allowed_domains = ['movie.douban.com']
start_urls = ['https://movie.douban.com/top250']
def parse(self, response):
yield from self.parse_movie(response)
is_last_page = '后页' not in response.css('.next a::text').getall()
if not is_last_page:
next_page_url = response.css('.next a::attr(href)').get()
yield Request(next_page_url, callback=self.parse)
def parse_movie(self, response):
for movie in response.css('.grid_view .item'):
yield {
'title': movie.css('.title::text').get(),
'info': movie.css('.bd p::text').get(),
'rating': movie.css('.rating_num::text').get(),
'cover': movie.css('img::attr(src)').get(),
}
app = Celery('tasks', broker='amqp://guest@localhost//')
@app.task
def crawl(url):
settings = get_project_settings()
process = CrawlerProcess(settings)
process.crawl(DoubanTopSpider)
process.start()
if __name__ == '__main__':
urls = [
'https://movie.douban.com/top250?start=0&filter=',
'https://movie.douban.com/top250?start=25&filter=',
'https://movie.douban.com/top250?start=50&filter=',
...
]
for url in urls:
crawl.delay(url)
在分布式爬虫代码中,我们定义了一个名为crawl的任务,并将其绑定到DoubanTopSpider爬虫上。在任务函数中,我们通过get_project_settings函数获取爬虫的配置,创建一个CrawlerProcess对象,然后开始运行DoubanTopSpider爬虫。
运行分布式爬虫代码,它会将豆瓣电影Top250的每一页分发到不同的工作节点上执行,最后将结果合并到本地文件douban_top250.csv中。
示例2:爬取新浪微博关键字
1. 爬虫代码
from scrapy import Spider, Request
from scrapy.crawler import CrawlerProcess
import json
class WeiboTopicSpider(Spider):
name = 'weibo_topic'
allowed_domains = ['m.weibo.cn']
base_url = 'https://m.weibo.cn/api/container/getIndex?type=wb&queryVal={}&since_id={}'
headers = {
'Referer': 'https://m.weibo.cn',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.159 Safari/537.36'
}
def start_requests(self):
url = self.base_url.format(self.keyword, '')
yield Request(url, headers=self.headers)
def parse(self, response):
data = json.loads(response.text)
cards = data.get('data', {}).get('cards', [])
for card in cards:
mblog = card.get('mblog', {})
yield {
'created_at': mblog.get('created_at'),
'text': mblog.get('text'),
'attitudes_count': mblog.get('attitudes_count'),
'comments_count': mblog.get('comments_count'),
'reposts_count': mblog.get('reposts_count'),
}
next_id = data.get('data', {}).get('cardlistInfo', {}).get('since_id')
if next_id:
next_url = self.base_url.format(self.keyword, next_id)
yield Request(next_url, headers=self.headers, callback=self.parse)
process = CrawlerProcess({
'FEED_FORMAT': 'csv',
'FEED_URI': 'weibo_topic.csv'
})
process.crawl(WeiboTopicSpider, keyword='Python')
process.start()
该爬虫爬取新浪微博中包含关键字'Python'的微博的文本、发布时间、点赞数、评论数和转发数,并将结果以csv格式保存到本地文件weibo_topic.csv中。
2. 分布式爬虫代码
from scrapy import Spider, Request
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
from celery import Celery
class WeiboTopicSpider(Spider):
name = 'weibo_topic'
allowed_domains = ['m.weibo.cn']
base_url = 'https://m.weibo.cn/api/container/getIndex?type=wb&queryVal={}&since_id={}'
headers = {
'Referer': 'https://m.weibo.cn',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.159 Safari/537.36'
}
def start_requests(self):
url = self.base_url.format(self.keyword, '')
yield Request(url, headers=self.headers)
def parse(self, response):
data = json.loads(response.text)
cards = data.get('data', {}).get('cards', [])
for card in cards:
mblog = card.get('mblog', {})
yield {
'created_at': mblog.get('created_at'),
'text': mblog.get('text'),
'attitudes_count': mblog.get('attitudes_count'),
'comments_count': mblog.get('comments_count'),
'reposts_count': mblog.get('reposts_count'),
}
next_id = data.get('data', {}).get('cardlistInfo', {}).get('since_id')
if next_id:
next_url = self.base_url.format(self.keyword, next_id)
yield Request(next_url, headers=self.headers, callback=self.parse)
app = Celery('tasks', broker='amqp://guest@localhost//')
@app.task
def crawl(keyword):
settings = get_project_settings()
process = CrawlerProcess(settings)
process.crawl(WeiboTopicSpider, keyword=keyword)
process.start()
if __name__ == '__main__':
keywords = ['Python', '数据挖掘', '人工智能', ...]
for keyword in keywords:
crawl.delay(keyword)
在分布式爬虫代码中,我们定义了一个名为crawl的任务,并将其绑定到WeiboTopicSpider爬虫上。在任务函数中,我们通过get_project_settings函数获取爬虫的配置,创建一个CrawlerProcess对象,然后开始运行WeiboTopicSpider爬虫。
运行分布式爬虫代码,它会将每个关键字的爬取任务分发到不同的工作节点上执行,最后将结果合并到本地文件weibo_topic.csv中。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:深入理解Python分布式爬虫原理 - Python技术站