爬虫速度的提升 并发和并行,同步和异步

鳄鱼君

发表文章数:591

热门标签

, , , ,

Vieu四代商业主题

高扩展、安全、稳定、响应式布局多功能模板。

¥69 现在购买
首页 » Python » 爬虫速度的提升 并发和并行,同步和异步

在实际工作中,爬虫处理的数据量是非常大的,如果仅仅使用单线程爬虫,效率是非常低的!这篇文章主要来说一下多线程爬虫、多进程爬虫和多协程爬虫。

并发和并行

并发(concurrency)和并行(parallelism)是两个相似的概念。并发:在一个时间段内发生若干事件的情况;并行:在同一时刻发生若干事件的情况。

在单核CPU上,多个任务是以并发的方式运行的,因为只有一个CPU,各个人物会分别占用CPU的一段时间一次执行。如果某个任务在时间段内没有完成,就会切换到另一个任务,然后再下一次得到CPU的使用权的时候再继续执行,直到完成。这种情况下,各个任务的时间段很短,经常切换,看起来多个任务像是在同时进行的。在多核CPU上,多个任务真正能够同时运行,而不是看起来像,这就是并行

同步和异步

在理解并发和并行的基础上,同步就是并发或并行的各个任务不是独自运行的,任务之间有一定的交替顺序,可能在运行完一个任务得到结果后,另一个任务才会开始运行,可以理解为串行。类似于4×4接力赛,要拿到交接棒之后,下一个选手才可以开始跑。

异步则是并发或并行的各个人物可以独立运行,一个任务的运行不受另一个任务的影响。类似于400赛跑,任务之间就像比赛的各个选手在不同的赛道比赛一样,跑步的速度不受其他赛道选手的影响!

假设现在打开4个不同的网站,IO(Input输入/Ouput输出)的过程相当于打开网站,cpu执行单击的动作。同步IO指你每单击个网站,需要等待该网站彻底显示才可以单击下一个网站;异步IO是指单击完一个网站,不用等待对方服务器返回结果,立即可以打开另一个网站,以此类推,最后同时等待4个网站彻底打开。

多线程、多进程、多协程都是使用异步的方式,“同时”获取多个网页,从而提升爬虫效率。

多线程爬虫

多线程爬虫的原理,这里简单说一下。Python在设计的时候是没有多核的电脑的,所以作者也就没有考虑多核的情况,所以设置了GIL(全局解释器锁)。在Python中,一个线程的执行过程包括获取GIL、执行代码直到挂起和释放GIL。

某个线程想要执行,必须要先申请GIL,可以把GIL看作“通行证”,并且在一个Python进程中,只有一个GI。拿不到GIL锁,线程就不允许进入CPU执行。每次释放GIL锁,线程之间都会进行锁竞争,线程的来回切换回消耗资源。

爬虫速度的提升 并发和并行,同步和异步

爬虫属于IO密集型,多线程能够有效地提升效率,因为单线程下IO操作会进行IO等待,开启多线程能在线程等待的时候切换到另一个线程,充分利用CPU的资源。

单线程爬虫

以单线程(单进程)为例,抓取1000个网页,这1000个网页我们可以从:中文网站排行榜进行提取,该网站一页有20个数据,我们只需要爬虫1000个,就是50页,代码参考:

import requests
from lxml import etree
headers={
    "User-Agent":"Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
}
for a in range(1,51):
    url='http://www.alexa.cn/siterank/{0}'.format(a)
    res=requests.get(url,headers=headers)
    tree=etree.HTML(res.text)
    for i in tree.xpath('//ul[@class="siterank-sitelist"]/li'):
        href=i.xpath('./div[2]/div[1]/span/a/@href')[0]
        with open('./alexa.txt','a',encoding='utf-8') as f:
            f.write(href+','+'\n')

将1000个网站的URL保存在alexa.txt文件,现在我使用单线程来抓取这1000个网页,之前先编写一个测试时间的装饰器,因为我们需要多次使用:

import time
def test_time(func):
    def wrapper(*args,**kwargs):
        start_time=time.time()
        func(*args,**kwargs)
        end_time=time.time()
        print('time cost:%s,%s'% (func.__name__,str(end_time-start_time)))
    return wrapper

然后定义单线程函数:

import requests,time
link_list=[] # 读取文件的URL,存放为list
with open('./alexa.txt','r') as f:
    file_list=f.readlines() #逐行读取
    for i in file_list:
        link=i.split(',')[0]
        link_list.append(link)
headers={
    "User-Agent":"Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
}

@test_time
def single_thread(link_list):
    for url in link_list:
        try:
            res=requests.get(url,headers=headers)
            print(res.status_code,url)
        except Exception as e:
            print('error',e)
single_thread(link_list)

运行代码,就可以测试出请求1000个网站所花费的时间,具体多少自己尝试,反正不快!

多线程爬虫

在Python中使用多线程有两种方式:

  1. 函数式:调用_thread模块的中的start_new_thread()方法产生新线程
  2. 继承式:调用Threading库创建线程,从threading.Thread类继承

threading模块提供了Thread类来处理线程,包括以下几个方法:

  1. run():表示线程活动的方法
  2. start():启动线程活动
  3. join([time]):等待到线程终止。阻塞调用线程直至线程的join方法被调用为止
  4. isAlive():返回线程是否是活动的
  5. getName():返回线程名
  6. setName():设置线程名

通过列子来了解多线程是如何运行的:

import threading
import time
class myThread(threading.Thread):
    def __init__(self,name,delay):
        threading.Thread.__init__(self)
        self.name=name
        self.delay=delay
    def run(self): # 方法固定
        print("starting "+self.name)
        print_time(self.name,self.delay)
        print("exiting "+self.name)
def print_time(threadNam,delay):
    count=0
    while count<3:
        time.sleep(delay)
        print(threadNam,time.ctime())
        count+=1
threads=[]
# 创建新线程
thread1=myThread("thread-1",2)
thread2=myThread("thread-2",3)

# 开启新线程
thread1.start()
thread2.start()

# 添加线程到线程列表
threads.append(thread1)
threads.append(thread2)

for i in threads:
    i.join()
print("exit main thread")

代码中,我们将任务分到两个线程中,即thread1=myThread(“thread-1”,2),然后在myThread类中对线程进行设置,使用run方法表示线程的运行方法,名字固定,当count小于3时,打印该线程的名称和时间!然后使用thread.start()开启线程,使用threads.append()将线程加入线程列表,使用join()等待所有的子线程执行完成才会执行主线程。

现在知道了如何使用多线程,呢么我们修改一下最上面的爬虫代码,使用多线程抓取1000个网页:

import requests,time,threading

link_list=[] # 读取文件的URL,存放为list
with open('./alexa.txt','r') as f:
    file_list=f.readlines() #逐行读取
    for i in file_list:
        link=i.split(',')[0]
        link_list.append(link)
headers={
    "User-Agent":"Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
}

start_time=time.time()
class myThread(threading.Thread):
    def __init__(self,name,link_range):
        threading.Thread.__init__(self)
        self.name=name
        self.link_range=link_range
    def run(self):
        print("starting "+self.name)
        crawler(self.name,self.link_range)
        print("exiting "+self.name)
def crawler(threadName,link_range):
    for i in range(link_range[0],link_range[1]+1):# (0, 201)
        try:
            res=requests.get(link_list[i],timeout=20,headers=headers)
            print(threadName,res.status_code,link_list[i])
        except Exception as e:
            print(threadName,'error',e)
thread_list = []
# 将1000个网页分成5份,每一份200个
link_range_list = [(0,200), (201,400), (401,600), (601,800), (801,1000)]

# 利用for循环创建5个线程,将这些网页分别交给这5个线程运行
for i in range(1,6):
    thread = myThread("Thread-" + str(i), link_range_list[i-1]) # 循环link_range_list
    thread.start()
    thread_list.append(thread)
# 等待所有线程完成
for thread in thread_list:
    thread.join()
end_time=time.time()
print('多线程花费时间:%s'% str(end_time-start_time))
print ("Exiting Main Thread")

由于这个测试耗时,需要在类上添加装饰器,鳄鱼君Ba不太会,所以暂时抛弃在类上使用装饰器,这里就直接计算花费的时间了。

在上面的代码中,我们将1000个网页分成5份,启动5个线程,每个线程执行200个。在每一个线程中,我们调用了爬虫函数crawler。为了让所有的子线程执行完毕再执行主进程,这里使用join方法等待所有的子线程执行完毕!这里耗时大约在420-430之间!

这样你可能就会思考,假设某一个线程执行完成,就会退出线程,这样就只剩4个线程在运行,一次类推,最后会剩下一个线程在运行,就变成了单线程。

使用Queue的多线程爬虫

为了充分利用多线程,这时候就需要使用Queue队列。python的Queue模块中提供了同步的,线程安全的队列类,包括FIFO(先入先出)队列,LIFO(后入先出)队列等等,具体可参考:threading模块、线程锁、信号量、Event、Queue队列

将这1000个网址放到队列中,各个线程都从这个队列获取网址,直到完成所有的抓取为止,代码参考:

import requests,time,threading
import queue

link_list=[] # 读取文件的URL,存放为list
with open('./alexa.txt','r') as f:
    file_list=f.readlines() #逐行读取
    for i in file_list:
        link=i.split(',')[0]
        link_list.append(link)
headers={
    "User-Agent":"Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
}
start_time=time.time()

class myThread(threading.Thread):
    def __init__(self,name,task_queue):
        threading.Thread.__init__(self)
        self.name=name
        self.q=task_queue
    def run(self):
        print("starting "+self.name)
        while True:
            try:
                crawler(self.name,self.q)
            except:
                break
        print("exiting "+self.name)
def crawler(threadName,q):
    url=q.get(timeout=2)
    try:
        res=requests.get(url,timeout=20,headers=headers)
        print(q.qsize(),threadName,res.status_code,url)
    except Exception as e:
        print(threadName,'error',e)

thread_list = ["thread-1","thread-2","thread-3","thread-4","thread-5"]
threads=[]
task_queue=queue.Queue(1000)

# 填充队列
for url in link_list:
    task_queue.put(url)

for i in thread_list:
    thread = myThread(i,task_queue)
    thread.start()
    threads.append(thread)

# 等待所有线程完成
for a in threads:
    a.join()
end_time=time.time()
print('queue多线程花费时间:%s'% str(end_time-start_time))
print ("Exiting Main Thread")

耗时350-360左右,这个会受网速和机器配置影响!实际工作中,这种爬虫方法使用的也是比较多的!

多进程爬虫

由于python全局解释器锁的存在,多线程爬虫并不能充分的利用多核CPU,呢么就需要使用多进程爬冲来利用多核CPU。在python中,使用多进程需要用到multiprocess库。multiprocess库有两种方法:Process+Queue或者Pool+Queue,我们分别测试一下!

查看CPU的进程,这里可以借助multiprocessing:

from multiprocessing import cpu_count
print(cpu_count())

Process+Queue多进程

这里根据自己情况,不要占满核心,鳄鱼君Ba的是12核心数,所以在这里使用10个进程:

from multiprocessing import Process,Queue
import requests,time

link_list=[] # 读取文件的URL,存放为list
with open('./alexa.txt','r') as f:
    file_list=f.readlines() #逐行读取
    for i in file_list:
        link=i.split(',')[0]
        link_list.append(link)
headers={
    "User-Agent":"Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
}

class myProcess(Process):
    def __init__(self,task_queue):
        Process.__init__(self)
        self.q=task_queue

    def run(self):
        print("starting ",self.pid)
        while not self.q.empty():
            crawler(self.q)

        print("exiting ",self.pid)
def crawler(q):
    url=q.get(timeout=2)
    try:
        res=requests.get(url,timeout=20,headers=headers)
        print(q.qsize(),res.status_code,url)
    except Exception as e:
        print('error',e)
if __name__ == '__main__':
    start_time = time.time()

    Process_list = ["process-1","process-2","process-3","process-4","process-5"]
    threads=[]
    task_queue=Queue(1000)

    # 填充队列
    for url in link_list:
        task_queue.put(url)

    for i in range(0,9):
        p = myProcess(task_queue)
        p.daemon=True # 父进程结束,子进程就会被终止
        p.start()
        p.join()

    end_time=time.time()
    print('queue多进程花费时间:%s'% str(end_time-start_time))
    print ("Exiting Main Process")

具体的花费时间,自己测试

Pool+Queue多进程

Pool可以提供指定数量的进程供用户调用。当有新的请求提交到pool中时,如果池还没有满,就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定的最大值,该请求就会继续等待,直到池中有进程结束才能够创建新的进程。

阻塞和非阻塞关注的是程序在等待调用结果(消息、返回值)时的状态。阻塞要等到回调结果出来,在有结果之前,当前进程会被挂起。非阻塞为添加进程后,不一定非要等到结果出来就可以添加其他进程运行。

我们可以使用Pool的非阻塞方法和Queue获取网页数据,代码如下:

from multiprocessing import Pool,Manager
import time,requests
link_list=[] # 读取文件的URL,存放为list
with open('./alexa.txt','r') as f:
    file_list=f.readlines() #逐行读取
    for i in file_list:
        link=i.split(',')[0]
        link_list.append(link)
headers={
    "User-Agent":"Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
}
start_time=time.time()
def crawler(q,index):
    process_id='process-'+str(index)
    while not q.empty():
        url=q.get(timeout=2)
        try:
            res=requests.get(url,headers=headers)
            print(process_id,q.qsize(),res.status_code,url)
        except Exception as e:
            print(process_id,'error',e)
if __name__ == '__main__':
    with Manager() as manager:# 父进程和子进程之间通过管道manager通信
        task_queue=manager.Queue(1000)
        # 填充队列
        for url in link_list:
            task_queue.put(url)
        pool=Pool(processes=3)
        for i in range(4):
            pool.apply_async(crawler,args=(task_queue,i))
            # pool.apply(crawler,args=(task_queue,i)) 串行
        print('starting process')
        pool.close()
        pool.join()
        end_time=time.time()
        print('pool+process多进程爬虫耗时:',end_time-start_time)
        print('exiting main process ')

多协程爬虫

协程是一种用户态的轻量级线程,使用协程有以下优点:

  1. 由于是单线程,没有上下文切换,资源消耗少
  2. 高扩展性和高并发性,一个CPU支持上万个协程,甚至更多
  3. 方便切换控制流,简化编程模型

协程的本质上是一个单线程,不能使用单个CPU的多核,需要和进程配合才能运行在多核CPU上。IO操作阻塞时间太长不要使用协程,可能会阻塞整个程序。

在python的协程中,可以使用gevent。使用pip命令就可以安装,安装完成就可以使用协程了:

import gevent
from gevent.queue import Queue

from gevent import monkey # 有可能有IO操作的单独坐上标记
monkey.patch_all()# 将IO转为异步执行的函数

import time,requests # gevent模块上面

link_list=[] # 读取文件的URL,存放为list
with open('./alexa.txt','r') as f:
    file_list=f.readlines() #逐行读取
    for i in file_list:
        link=i.split(',')[0]
        link_list.append(link)
headers={
    "User-Agent":"Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
}

start_time=time.time()
def crawler(index):
    process_id='process-'+str(index)
    while not task_queue.empty():
        url=task_queue.get(timeout=2)
        try:
            res=requests.get(url,headers=headers,timeout=20)
            print(process_id,url,res.status_code)
        except Exception as e:
            print('error',e,url)
def boss():
    for url in link_list:
        task_queue.put_nowait(url)
if __name__ == '__main__':
    task_queue=Queue(1000) # 生成一个队列
    gevent.spawn(boss).join() #将队列加入到gevent协程中
    jobs=[]
    for i in range(10): # 10个协程
        jobs.append(gevent.spawn(crawler,i)) # 生成一个协程
    gevent.joinall(jobs) # 将
    end_time=time.time()
    print('gevent+queue多协程爬虫耗时:',end_time-start_time)

经过测试,感觉协程速度更快一些!具体效果,可以自己尝试!

未经允许不得转载:作者:鳄鱼君, 转载或复制请以 超链接形式 并注明出处 鳄鱼君
原文地址:《爬虫速度的提升 并发和并行,同步和异步》 发布于2020-07-03

分享到:
赞(0) 赏杯咖啡

评论 抢沙发

4 + 9 =


文章对你有帮助可赏作者一杯咖啡

支付宝扫一扫打赏

微信扫一扫打赏

Vieu4.6主题
专业打造轻量级个人企业风格博客主题!专注于前端开发,全站响应式布局自适应模板。
切换注册

登录

忘记密码 ?

您也可以使用第三方帐号快捷登录

Q Q 登 录
微 博 登 录
切换登录

注册