Python异步爬虫试验[Celery,gevent,requests]

516 查看

以往爬虫都是用自己写的一个爬虫框架,一群Workers去Master那领取任务后开始爬。进程数量等于处理器核心数,通过增开线程数提高爬取速度。
最近看了Celery,接口真是优美,挺想试验下异步模型来写个爬虫。

模拟目标

为了方便测试,用Tornado搭了一个简易的服务器,用来模拟被爬的网站。
功能很简单,每个请求阻塞6秒才回复

import tornado.web
import tornado.ioloop
import time
from concurrent.futures import ThreadPoolExecutor
from tornado.concurrent import run_on_executor
import tornado.gen

class MainHandler(tornado.web.RequestHandler):
    executor = ThreadPoolExecutor(40)

    @tornado.web.asynchronous
    @tornado.gen.coroutine
    def get(self):
        print(time.asctime())
        yield self.sleep(6)
        self.write('from server:' + time.asctime())
        self.finish()

    @run_on_executor
    def sleep(self, sec):
        time.sleep(sec)


if __name__ == '__main__':
    app = tornado.web.Application(handlers=[
        ('^/.*', MainHandler)
    ])
    app.listen(10240)
    tornado.ioloop.IOLoop.instance().start()

消费者

task里就一个spider函数,功能是利用gevent去请求给定的目标

import gevent.monkey
gevent.monkey.patch_socket()

from celery import Celery
import socket
import requests
import gevent

app = Celery('tasks',
             broker='redis://127.0.0.1:6379/3',
             backend='redis://127.0.0.1:6379/3')
@app.task
def spider(url):
    resp = gevent.spawn(requests.get, url)
    tmp = 0
    while True:
        print('wait...', tmp)
        if resp.ready():
            return 'from:' + socket.getfqdn() + '\nres:' + str(resp.value.text)
        gevent.sleep(1)
        tmp += 1

用gevent模式启动Celery

celery worker -A tasks --loglevel info -c 100 -P gevent

生产者

利用刚刚编写的spider函数去爬取目标
测试中,下面代码开了6个进程,结果均在7秒内返回,证明成功了。

from tasks import spider
import time
import random

res = spider.delay('http://127.0.0.1:10240/{}'.format(random.randint(1, 999)))
i = 0
while True:
    if res.ready():
        print('res:', res.get())
        break
    else:
        print('wait...', i)
    time.sleep(1)
    i += 1

Celery的部分日志输出:
可以看出在一个Celery进程内,多个spider函数轮替执行的

[2016-08-20 21:27:11,281: INFO/MainProcess] Starting new HTTP connection (1): 127.0.0.1
[2016-08-20 21:27:11,313: INFO/MainProcess] Received task: tasks.spider[7b8b6f63-2bef-491e-a3a8-fdbcff824b9c]
[2016-08-20 21:27:11,314: WARNING/MainProcess] wait...
[2016-08-20 21:27:11,314: WARNING/MainProcess] 0
[2016-08-20 21:27:11,316: INFO/MainProcess] Starting new HTTP connection (1): 127.0.0.1
[2016-08-20 21:27:11,354: INFO/MainProcess] Received task: tasks.spider[5aa05e65-504d-4a04-8247-3f5708bfa46f]
[2016-08-20 21:27:11,356: WARNING/MainProcess] wait...
[2016-08-20 21:27:11,356: WARNING/MainProcess] 0
[2016-08-20 21:27:11,357: INFO/MainProcess] Starting new HTTP connection (1): 127.0.0.1
[2016-08-20 21:27:11,821: WARNING/MainProcess] wait...
[2016-08-20 21:27:11,821: WARNING/MainProcess] 1
[2016-08-20 21:27:11,989: WARNING/MainProcess] wait...
[2016-08-20 21:27:11,990: WARNING/MainProcess] 1
[2016-08-20 21:27:12,059: WARNING/MainProcess] wait...
[2016-08-20 21:27:12,059: WARNING/MainProcess] 2
[2016-08-20 21:27:12,208: WARNING/MainProcess] wait...
[2016-08-20 21:27:12,209: WARNING/MainProcess] 1
[2016-08-20 21:27:12,225: WARNING/MainProcess] wait...
[2016-08-20 21:27:12,225: WARNING/MainProcess] 1
[2016-08-20 21:27:12,246: WARNING/MainProcess] wait...
[2016-08-20 21:27:12,247: WARNING/MainProcess] 2
[2016-08-20 21:27:12,282: WARNING/MainProcess] wait...
[2016-08-20 21:27:12,282: WARNING/MainProcess] 1
[2016-08-20 21:27:12,316: WARNING/MainProcess] wait...
[2016-08-20 21:27:12,316: WARNING/MainProcess] 1
[2016-08-20 21:27:12,357: WARNING/MainProcess] wait...
[2016-08-20 21:27:12,357: WARNING/MainProcess] 1
[2016-08-20 21:27:12,823: WARNING/MainProcess] wait...
[2016-08-20 21:27:12,823: WARNING/MainProcess] 2
[2016-08-20 21:27:12,991: WARNING/MainProcess] wait...
[2016-08-20 21:27:12,992: WARNING/MainProcess] 2
[2016-08-20 21:27:13,061: WARNING/MainProcess] wait...
[2016-08-20 21:27:13,061: WARNING/MainProcess] 3
[2016-08-20 21:27:13,210: WARNING/MainProcess] wait...
[2016-08-20 21:27:13,211: WARNING/MainProcess] 2
[2016-08-20 21:27:13,227: WARNING/MainProcess] wait...
[2016-08-20 21:27:13,227: WARNING/MainProcess] 2

最后

借助Celery,爬虫很容易实现横向扩展,在多台服务器上增加消费者进程即可;
借助gevent,单进程内requests做到了非阻塞,而我过去是用多线程对付阻塞的。
Celery,gevent我也是初学一天,这小玩意儿做出来后,得开始看文档了深入了解了!