Python使用协程抓取非小号部分数据

鳄鱼君

发表文章数:531

Vieu四代商业主题

高扩展、安全、稳定、响应式布局多功能模板。

¥69 现在购买
首页 » Python教程 » Python使用协程抓取非小号部分数据

采用协程的方式抓取非小号网站的相关数据,那么首先这里以csv文件为例,我们把需要抓取的所有信息的url存入csv文件中,那么读取的代码参考:

import asyncio
import csv
async def main():
    #loop=asyncio.get_event_loop()
    csv_reader=csv.reader(open('XXX.csv',encoding='utf-8'))
    for row in csv_reader:
        print(row)

if __name__=='__main__':
    loop=asyncio.get_event_loop() #d定义事件循环
    loop.create_task(main()) #创建任务
    loop.run_forever() #一直运行

读取csv文件,这个csv文件其实就是通过非小号接口抓取每个币种详情页的url,我们要提取的数据就是在这个详情页,csv文件需要的话可以自己下载。

async def main():
    csv_reader=csv.reader(open('feixiaohao.csv',encoding='utf-8'))
    for row in csv_reader:
        try:
            if row[1].startswith('https'):#判断是否以https开头
                await Common.task_queue.put(row)#将row放入队列里面
        except:
            pass
    #print(Common.task_queue)
    await get_market_cap()
    print('总市值:{}'.format(Common.market_cap_all))
    await get_currency_rate()
    print('汇率:{}'.format(Common.currency_rate) )
    for _ in range(10):#同时创建10个down_and_parse_task任务,提高效率
        loop.create_task(down_and_parse_task(Common.task_queue))
    loop.create_task(monitor_finish())
    loop.create_task(speed_monitor()) #速度检测
    loop.create_task(time_limit()) #5分钟跑完

if __name__=='__main__':
    loop=asyncio.get_event_loop()
    loop.create_task(main())
    loop.run_forever() #一直运行

main函数中包含有down_and_parse_task(获取网页html和状态码并解析除相关数据),monitor_finish,speed_monitor(速度检测),time_limit(时间限制),get__currency_rate(获取汇率),get_market_cap(获取总市值)函数,还有一个队列,队列的代码参考:

class Common():
    task_queue=Queue()#创建实例队列,任务队列
    result_queue=Queue()#结果队列
    market_cap_all=0  #总市值初始值
    currency_rate=0 #汇率初始值

那么其他一些不重要的函数代码参考:

async def get_market_cap():
    #总市值的接口url
    url='https://dncapi.bqiapp.com/api/home/global?webp=0'
    response=requests.get(url)
    #通过接口提取总市值
    response_json=json.loads(response.text)
    marketcap=response_json['data']['marketcapvol']
    Common.market_cap_all=int(marketcap)#重新赋值给market_cap_all
async def get_currency_rate():
    url_rate='https://dncapi.bqiapp.com/api/coin/web-rate'
    response=requests.get(url_rate)
    #通过接口提取人民币的汇率
    currency_rate=json.loads(response.text)['data'][11]['cny']
    Common.currency_rate=currency_rate#重新赋值给currency_rate
async def time_limit():#设置300秒上限
    await asyncio.sleep(300)
    raise SystemExit()

async def speed_monitor():#速度检测
    while Common.task_queue.qsize()!=0: #判断队列大小
        old_queue_len=Common.task_queue.qsize()
        await asyncio.sleep(5)
        new_queue_size=Common.task_queue.qsize()
        print('《---------------》')
        print('speed = ',(old_queue_len-new_queue_size)/5) #5秒中之内,算一下速度

async def monitor_finish():
    while len(asyncio.Task.all_tasks())>3:#判断当前任务数量是否大于3
        await asyncio.sleep(1)
    await asyncio.sleep(5)
    raise SystemExit()

down_and_parse_task函数参考代码:

async def down_and_parse_task(queue):
    while 1 :#死循环
        try:
            name,url=queue.get_nowait()
        except:
            return
        for retry_cnt in range(3):#重试3次
            try:
                html,status=await download(url)
                if status ==200: #如果状态码不是200,重试
                    html,status=await download(url)
                html_parse_result=await parse_html(name,url,html)
                print(html_parse_result)
                await Common.result_queue.put(html_parse_result)
                break  #成功则退出异常
            except:
                await asyncio.sleep(0.2)
                continue

在这个函数中包含download,parse_html函数,具体的download函数代码参考:

async def download(url):
    connector=ProxyConnector()#创建实例
    async with aiohttp.ClientSession(
        connector=connector,
        request_class=ProxyClientRequest
    ) as session:
        ret,status=await session_get(session,url)
        if 'window.location.href=' in ret and len(ret)<1000:
            url=ret.split("window.location.href='")[1].split("'")[0]
            ret,status=await session_get(session,url)
        return ret,status

其中download函数汇总包含有session_get函数,代码参考:

async def session_get(session,url):
    headers={'User-Agent':'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; TencentTraveler 4.0)'}
    timeout=aiohttp.ClientTimeout(total=20)  #设置超时时间
    response = await session.get(
        url,timeout=timeout, headers=headers, ssl=ssl.SSLContext()
    )
    return await response.text(),response.status

然后是parse_html函数,这个就是具体解析网页中需要的数据的代码,使用xpath进行解析,跟以前的一样,具体代码参考:

async def parse_html(name,url,response):
    coin_info={}
    coin_value={}
    coin_info['url']=url
    coin_info['name']=name
    coin_info['time']=int(time.time())
    tree=etree.HTML(response)
    #,美元价格,上下涨幅,流通市值,流通量,流通率,24小时成交额,换手率,发行时间,最大供应量。
    try:#美元价格
        price_usd=tree.xpath('//div[@class="priceInfo"]/div[@class="sub"]/span/span/text()')[0].strip()
        coin_value['price']='$'+price_usd
    except:
        pass
    try:#上下涨幅
        updown=tree.xpath('//div[@class="priceInfo"]/div[@class="sub smallfont"]/span[1]/text()')[0].strip()
        coin_value['updown']='$'+updown
    except:
        pass
    try:#24小时成交额需要转换为美元

        value_24h=tree.xpath('//div[@class="priceInfo"]/div[@class="chart_list"]/div[3]/div[2]/span/text()')[0].strip()
        coin_value['value_24h']=(int(value_24h)/Common.currency_rate)
    except:
        pass
    try:#换手率

        value_24h=tree.xpath('//div[@class="priceInfo"]/div[@class="chart_list"]/div[3]/div[2]/div[@class="charbox"]/div[2]/text()')[0].strip()
        coin_value['value_24h']=(int(value_24h)/Common.currency_rate)
    except:
        pass
    try:#流通量
        circulating_supply=tree.xpath('//div[@class="priceInfo"]/div[@class="chart_list"]/div[2]/div[2]/text()')[0].strip()
        coin_value['circulating_supply']=int(circulating_supply)
    except:
        pass

    try:#流通率
        circulating_supply=tree.xpath('//div[@class="priceInfo"]/div[@class="chart_list"]/div[2]/div[@class="charbox"]/div[2]/text()')[0].strip()
        coin_value['circulating_supply']=int(circulating_supply)
    except:
        pass
    try:#币的当前市值
        bi_price=tree.xpath('//*[@id="__layout"]/section/div/div/div[1]/div[1]/div[3]/div[2]/div[1]/span[2]/span[2]/text()')[0].strip()
        coin_value['bi_price']=bi_price

    except:
        pass
    try:#发行时间
        data_time=tree.xpath('//*[@id="__layout"]/section/div/div/div[1]/div[2]/div/div[3]/div[1]/div[1]/div[3]/div[1]/span[2]/text()')[0].strip()
        coin_value['data_time']=data_time
    except:
        pass
    try:#上架交易所
        pass
    except:
        pass
    coin_info['value']=coin_value
    return coin_info

至此全部代码可参考:

import asyncio
import aiohttp
import requests,random,json,time
from lxml import etree
from asyncio.queues import Queue
import csv
import ssl
from aiosocksy import Socks5Auth   #使用socks代理
from aiosocksy.connector import ProxyConnector
from aiosocksy.connector import ProxyClientRequest
class Common():
    task_queue=Queue()#创建实例队列,任务队列
    result_queue=Queue()#结果队列
    market_cap_all=0  #总市值初始值
    currency_rate=0 #汇率初始值
#免费socks5代理:http://31f.cn/socks-proxy/
#线上内网
socks5_address_prod=[
'socks5://221.232.233.159:1080',
    'socks5://117.34.70.200:1081',
    'socks5://120.197.179.170:1080'
]

async def get_market_cap():
    #总市值的接口url
    url='https://dncapi.bqiapp.com/api/home/global?webp=0'
    response=requests.get(url)
    #通过接口提取总市值
    response_json=json.loads(response.text)
    marketcap=response_json['data']['marketcapvol']
    Common.market_cap_all=int(marketcap)#重新赋值给market_cap_all
async def get_currency_rate():
    url_rate='https://dncapi.bqiapp.com/api/coin/web-rate'
    response=requests.get(url_rate)
    #通过接口提取人民币的汇率
    currency_rate=json.loads(response.text)['data'][11]['cny']
    Common.currency_rate=currency_rate#重新赋值给currency_rate
async def time_limit():#设置300秒上限
    await asyncio.sleep(300)
    raise SystemExit()


async def down_and_parse_task(queue):
    while 1 :#死循环
        try:
            name,url=queue.get_nowait()
        except:
            return
        for retry_cnt in range(3):#重试3次
            try:
                html,status=await download(url)
                if status ==200: #如果状态码不是200,重试
                    html,status=await download(url)
                html_parse_result=await parse_html(name,url,html)
                print(html_parse_result)
                await Common.result_queue.put(html_parse_result)
                break  #成功则退出异常
            except:
                await asyncio.sleep(0.2)
                continue
async def download(url):
    connector=ProxyConnector()#创建实例
    async with aiohttp.ClientSession(
        connector=connector,
        request_class=ProxyClientRequest
    ) as session:
        ret,status=await session_get(session,url)
        if 'window.location.href=' in ret and len(ret)<1000:
            url=ret.split("window.location.href='")[1].split("'")[0]
            ret,status=await session_get(session,url)
        return ret,status
async def parse_html(name,url,response):
    coin_info={}
    coin_value={}
    coin_info['url']=url
    coin_info['name']=name
    coin_info['time']=int(time.time())
    tree=etree.HTML(response)
    #,美元价格,上下涨幅,流通市值,流通量,流通率,24小时成交额,换手率,发行时间,最大供应量。
    try:#美元价格
        price_usd=tree.xpath('//div[@class="priceInfo"]/div[@class="sub"]/span/span/text()')[0].strip()
        coin_value['price']='$'+price_usd
    except:
        pass
    try:#上下涨幅
        updown=tree.xpath('//div[@class="priceInfo"]/div[@class="sub smallfont"]/span[1]/text()')[0].strip()
        coin_value['updown']='$'+updown
    except:
        pass
    try:#24小时成交额需要转换为美元

        value_24h=tree.xpath('//div[@class="priceInfo"]/div[@class="chart_list"]/div[3]/div[2]/span/text()')[0].strip()
        coin_value['value_24h']=(int(value_24h)/Common.currency_rate)
    except:
        pass
    try:#换手率

        value_24h=tree.xpath('//div[@class="priceInfo"]/div[@class="chart_list"]/div[3]/div[2]/div[@class="charbox"]/div[2]/text()')[0].strip()
        coin_value['value_24h']=(int(value_24h)/Common.currency_rate)
    except:
        pass
    try:#流通量
        circulating_supply=tree.xpath('//div[@class="priceInfo"]/div[@class="chart_list"]/div[2]/div[2]/text()')[0].strip()
        coin_value['circulating_supply']=int(circulating_supply)
    except:
        pass

    try:#流通率
        circulating_supply=tree.xpath('//div[@class="priceInfo"]/div[@class="chart_list"]/div[2]/div[@class="charbox"]/div[2]/text()')[0].strip()
        coin_value['circulating_supply']=int(circulating_supply)
    except:
        pass
    try:#币的当前市值
        bi_price=tree.xpath('//*[@id="__layout"]/section/div/div/div[1]/div[1]/div[3]/div[2]/div[1]/span[2]/span[2]/text()')[0].strip()
        coin_value['bi_price']=bi_price

    except:
        pass
    try:#发行时间
        data_time=tree.xpath('//*[@id="__layout"]/section/div/div/div[1]/div[2]/div/div[3]/div[1]/div[1]/div[3]/div[1]/span[2]/text()')[0].strip()
        coin_value['data_time']=data_time
    except:
        pass
    try:#上架交易所
        pass
    except:
        pass
    coin_info['value']=coin_value
    return coin_info

async def session_get(session,url):
    headers={'User-Agent':'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; TencentTraveler 4.0)'}
    timeout=aiohttp.ClientTimeout(total=20)  #设置超时时间
    response = await session.get(
        url,timeout=timeout, headers=headers, ssl=ssl.SSLContext()
    )
    return await response.text(),response.status

async def speed_monitor():#速度检测
    while Common.task_queue.qsize()!=0: #判断队列大小
        old_queue_len=Common.task_queue.qsize()
        await asyncio.sleep(5)
        new_queue_size=Common.task_queue.qsize()
        print('《---------------》')
        print('speed = ',(old_queue_len-new_queue_size)/5) #5秒中之内,算一下速度

async def monitor_finish():
    while len(asyncio.Task.all_tasks())>3:#判断当前任务数量是否大于3
        await asyncio.sleep(1)
    await asyncio.sleep(5)
    raise SystemExit()
async def push_results():
    pass

async def main():
    csv_reader=csv.reader(open('feixiaohao.csv',encoding='utf-8'))
    for row in csv_reader:
        try:
            if row[1].startswith('https'):#判断是否以https开头
                await Common.task_queue.put(row)#将row放入队列里面
        except:
            pass
    #print(Common.task_queue)
    await get_market_cap()
    print('总市值:{}'.format(Common.market_cap_all))
    await get_currency_rate()
    print('汇率:{}'.format(Common.currency_rate) )
    for _ in range(10):#同时创建10个down_and_parse_task任务,提高效率
        loop.create_task(down_and_parse_task(Common.task_queue))
    loop.create_task(monitor_finish())
    loop.create_task(speed_monitor()) #速度检测
    loop.create_task(time_limit()) #5分钟跑完

if __name__=='__main__':
    loop=asyncio.get_event_loop()
    loop.create_task(main())
    loop.run_forever() #一直运行

在爬取的时候没有使用代理,这个貌似只能添加socks5的代理,一般来说,普通的网页没有相关的限制,就是在抓取csv文件的时候,有个数据的json接口,操作次数过多会封掉ip,但还是把csv文件抓取下来,供练习使用。抓取的的数据基本都在详情页,可随意提取数据,具体参考网址:https://www.feixiaohao.com/currencies/bitcoin/

未经允许不得转载:作者:鳄鱼君, 转载或复制请以 超链接形式 并注明出处 鳄鱼君
原文地址:《Python使用协程抓取非小号部分数据》 发布于2020-02-28

分享到:
赞(0) 赏杯咖啡

评论 抢沙发

6 + 2 =


文章对你有帮助可赏作者一杯咖啡

支付宝扫一扫打赏

微信扫一扫打赏

Vieu4.6主题
专业打造轻量级个人企业风格博客主题!专注于前端开发,全站响应式布局自适应模板。
切换注册

登录

忘记密码 ?

您也可以使用第三方帐号快捷登录

Q Q 登 录
微 博 登 录
切换登录

注册