自己动手开发爬虫框架 数据清洗机制 数据存储机制

鳄鱼君Ba

发表文章数:518

热门标签

, ,

Vieu四代商业主题

高扩展、安全、稳定、响应式布局多功能模板。

¥69 现在购买
首页 » Python教程 » 自己动手开发爬虫框架 数据清洗机制 数据存储机制

不管使用爬虫库还是爬虫框架,都可以将爬虫程序分为:数据爬取、数据清洗、数据入库。那么我们自己开发爬虫框架,也需要按照这个逻辑来实现。爬虫框架由4个文件组成:初始化文件__init__.py和功能文件pattern.py、spider.py、storage.py,文件说明如下:

  1. 初始化文件__init__.py用于设置框架的版本信息和导入框架的功能文件。
  2. 数据清洗文件pattern.py用于定义数据清洗类,清洗方式与Scrapy框架相似。
  3. 数据爬取文件spider.py用于定义数据爬取类,爬取方式支持异步并发、URL去重和分布式。
  4. 数据存储文件storage.py用于定义数据存储类,目前支持关系型数据库、非关系型数据库、CSV文件存储数据和文件下载功能。

我们将框架命名为myReptile,需要提前创建好各个文件模块

初始化文件__init__.py设置框架的版本信息及导入框架的功能文件,代码如下:

# project:myReptile
# author:eyujun
__version__='1.0.0'

# 导入功能模块
from .storage import *
from .spider import *
from .pattern import *

初始化文件是整个框架的入口,它导入了整个框架的功能。在使用框架的时候,只需在初始化文件调用相关的功能模块即可。

myReptile框架的设计原理:

  1. 数据爬取方式由URL地址的数据格式决定,如果URL地址的数据格式为列表,myReptile就会执行异步并发,并将所有请求的响应内容以列表格式返回;如果传入的URL地址是字符串格式(即单一的URL地址),myReptile就直接返回相应的响应内容;并且还支持URL去重和分布式爬虫功能。
  2. 数据清洗采用Scrapy框架的清洗模式,使用方式跟Scrapy框架差不多,目前仅支持CssSelector和Xpath定位方式。
  3. 数据入库支持关系型数据库、非关系型数据库和CSV文件存储,关系型数据库由SQLAlchemy框架实现;非关系型数据库目前仅支持MongoDB数据库。myReptile简化入库方式,只需将爬取的数据以字典格式传入即可实现入库操作。

异步爬取

myReptile框架的数据爬取由Aiohttp模块实现,因此它具备了异步并发功能。我们将Aiohttp模块的数据爬取功能进行封装和延伸,简化了其使用方式,使用者只需调用相关的函数并传入参数即可发送HTTP请求。打开spider.py文件,在文件里定义爬虫类Request,代码如下:

import aiohttp,asyncio,redis

# 设置默认参数
TIMEOUT=30
REQUEST_HEADERS={

}
# 实例化对象,用于发送HTTP请求
loop=asyncio.get_event_loop()

# 定义装饰器,实现URL去重或分布式处理
def distributes(func):
    def wrapper(self,url,**kwargs):
        redis_host=kwargs.get('redis_host','')
        if redis_host:
            port=kwargs.get('port',6379)
            redis_client=redis.StrictRedis(host=redis_host,port=port)
            redis_data_dict='keys'
            if not redis_client.hexists(redis_data_dict,url): # 判断url知否存在keys键中
                redis_client.hset(redis_data_dict,url,0)  # 不存在就会插入
                return func(self,url,**kwargs)
            else:
                return {}
        else:
            return func(self,url,**kwargs)
    return wrapper
# 定义爬虫类
class Request(object):
    # 定义异步函数
    async def httpGet(self,url,**kwargs):
        cookies=kwargs.get('cookies',{ })
        params=kwargs.get('params',{ })
        proxy=kwargs.get('proxy','')
        timeout=kwargs.get('timeout',TIMEOUT)
        headers=kwargs.get('headers',REQUEST_HEADERS)
        # 代理IP
        if proxy:
            async  with aiohttp.ClientSession(cookies=cookies) as session:
                async with session.get(url,
                                       params=params,
                                       proxy=proxy,
                                       timeout=timeout,
                                       headers=headers) as response:
                    # 将响应内容以字典格式fanhui
                    result= dict(
                        content=await response.read(),
                        text=await response.text(),
                        status=response.status,
                        headers=response.headers,
                        url=response.url
                    )
                    return result
        # 不使用代理ip
        else:
            async  with aiohttp.ClientSession(cookies=cookies) as session:
                async with session.get(url,
                                       params=params,
                                       timeout=timeout,
                                       headers=headers) as response:
                    # 将响应内容以字典格式fanhui
                    result= dict(
                        content=await response.read(),
                        text=await response.text(),
                        status=response.status,
                        headers=response.headers,
                        url=response.url
                    )
                    return result

    # 定义异步函数
    async def httpPost(self, url, **kwargs):
        cookies = kwargs.get('cookies', {})
        data = kwargs.get('data', {})
        proxy = kwargs.get('proxy', '')
        timeout = kwargs.get('timeout', TIMEOUT)
        headers = kwargs.get('headers', REQUEST_HEADERS)
        if proxy:
            async  with aiohttp.ClientSession(cookies=cookies) as session:
                async with session.post(url,
                                       data=data,
                                       proxy=proxy,
                                       timeout=timeout,
                                       headers=headers) as response:
                    # 将响应内容以字典格式fanhui
                    result = dict(
                        content=await response.read(),
                        text=await response.text(),
                        status=response.status,
                        headers=response.headers,
                        url=response.url
                    )
                    return result
        else:
            async  with aiohttp.ClientSession(cookies=cookies) as session:
                async with session.post(url,
                                       data=data,
                                       timeout=timeout,
                                       headers=headers) as response:
                    # 将响应内容以字典格式fanhui
                    result = dict(
                        content=await response.read(),
                        text=await response.text(),
                        status=response.status,
                        headers=response.headers,
                        url=response.url
                    )
                    return result

    # 定义GeT函数
    @distributes
    def get(self,url,**kwargs):
        print(url,kwargs)
        tasks=[]
        if isinstance(url,list):

            for i in url:
                task=asyncio.ensure_future(self.httpGet(i,**kwargs))
                tasks.append(task)
            result=loop.run_until_complete(asyncio.gather(*tasks))
        else:
            result=loop.run_until_complete(self.httpGet(url,**kwargs))
        return result
    # 定义Post函数
    @distributes
    def post(self, url, **kwargs):
        tasks = []
        if isinstance(url, list):
            for i in url:
                task = asyncio.ensure_future(self.httpPost(i, **kwargs))
                tasks.append(task)
            result = loop.run_until_complete(asyncio.gather(*tasks))
        else:
            result = loop.run_until_complete(self.httpPost(url, **kwargs))
        return result
# 实例化Request对象
request=Request()

代码解释:

  1. 函数httpGet是定义Aiohttp的异步GET请求函数,函数参数以字符串格式表示,代表请求地址URL,可选参数kwargs代表自定义的请求设置,例如请求头、代理IP、Cookies信息、超时和请求参数等等!
  2. 函数httpGet会对参数proxy进行判断,如果参数proxy非空,Aiothhp在发送GET请求的时候,就会在请求里面添加参数proxy,如果proxy为空,还在请求中添加proxy,Aiohttp就会提示异常,因此需要对proxy进行判断,最后,函数会将响应内容以字典格式返回。
  3. 函数httpPost跟httpGet两者只是处理的请求不一样,其他都是一样的
  4. 函数get是定义爬虫类Request的GET请求方式,函数参数url的数据格式可以为字符串或者列表,可选参数kwargs代表自定义的请求设置,例如请求头、代理IP、Cookies信息、超时和请求参数等,参数kwargs也是函数httpGet的参数kwargs
  5. 函数get经过装饰器distributes过滤,装饰器从函数get获取Redis数据库连接参数,如果没有数据库连接参数,就会向下执行函数get;如果存在数据库连接参数,则连接Redis数据库并判断参数url是否记录在Redis数据库,如果已经存在,不在执行get函数,反之执行get函数
  6. 函数get对参数url进行判断,如果url属于列表类型,就需要对列表进行遍历,每次遍历调用函数httpGet,传入当前的URL地址并添加到任务列表,然后将任务列表交给对象loop处理,对所有任务发送异步并发的HTTP请求,最后将所有请求的响应内容以列表格式返回。如果url属于字符串,则由对象loop调用函数httpGet,发送HTTP请求并返回相应内容。
  7. 函数post是定义爬虫类Request的POST请求方式,函数参数url和kwargs与函数get的参数一样,函数功能个get详细,区别在于调用的Aiohttp异步函数各有不同

爬虫Request类的代码有很多重复的,Aiohttp在使用的过程需要使用with模块,这避免不了!接下来我们需要测试一下代码是否正常。

rom spider import request
# Get请求
url='https://www.e1yu.com'
params={
    'myReptile':'spiderGet'
}
cookies={
    'myReptile':'spiderGet'
}
# URL去重分布式,设置redis数据库连接参数
redis_host='127.0.0.1'

res=request.get(url,params=params,cookies=cookies,redis_host=redis_host)
print(res)

# Post请求
url=['http://httpbin.org/post']
data={
    'myReptile':'spiderPost',
}
cookies={
    'myReptile':'spiderCookies'
}
res=request.post(url,data=data,cookies=cookies,redis_host=redis_host)
print(res)

myReptile框架回个局参数url的数据格式执行相应的请求处理,运行上面的代码都会输出对应请求的响应内容。运行代码记得开启redis-server服务,之后我们可以在redis-cli中查看记录的URL,这里不过多介绍!

数据清洗

myReptile框架的数据清洗主要使用BeautifulSoup和lxml(xpath)实现,接下来我们在pattern.py文件定义数据清洗类DataPattern,代码参考:


代码分析:

  1. 数据清洗类DataPattern定义了两个函数cssSelector()和xpath()
  2. 参数response代表HTTP的响应内容
  3. 参数selector代表目标数据的定位方法,定位方法采用CssSelector或Xpath语法
  4. 可选参数kwargs是自定义设置,参数parser可自定义HTML解析器,默认使用Python标准库解析器html.parser

接下来需要测试数据清洗类DataPattern是否运行正常,创建一个测试文件(位置随意),代码如下:

from pattern import dataPattern
from spider import request

url='https://www.e1yu.com'
headers={
    "User-Agent": "Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:77.0) Gecko/20100101 Firefox/77.0"
}
res=request.get(url,headers=headers)

# cssSelector提取
title=dataPattern.cssSelector(res['text'],'h2')
print(title)

# xpath提取
title=dataPattern.xpath(res['text'],'//h2')
print(title)

缺点:抓取多个数据时,信息无法匹配,例如现在要抓取标题和内容,就无法让标题对应内容。感兴趣的可以在延伸一下!

数据存储机制

myReptile框架的数据存储是采用SQLAlchemy框架pymongocsv模块实现的,分别提供了三种不同的数据存储方式,在使用过程中只需设置数据存储方式及调用相关方法即可实现数据存储处理。打开storage.py文件,在文件里定义数据存储类DataStorage,代码参考:

from sqlalchemy import *
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from pymongo import MongoClient
import csv
import os
Base=declarative_base()

# 数据存储类DataStorage
class DataStorage(object):
    def __init__(self,CONNECTION,**kwargs):
        self.databaseType=kwargs.get('databaseType','csv')
        # 根据参数databaseType选择存储方式,默认csv格式
        if self.databaseType=='SQL':
            # 根据字段创建映射类和数据表
            self.field()
            tablename=kwargs.get('tablename',self.__class__.__name__)
            self.table=self.table(tablename)
            self.DBSession=self.connect(CONNECTION)
        elif self.databaseType=='NoSQL':
            self.DBSession=self.connect(CONNECTION)
        else:
            self.path=CONNECTION

    # 定义数据表字段
    def field(self):
        # self.name=Column(String(50))
        pass
    # 连接数据库,生成DBSession对象
    def connect(self,CONNECTION):
        # 连接关系型数据库
        if self.databaseType=='SQL':
            engine=create_engine(CONNECTION)
            DBSession=sessionmaker(bind=engine)
            Base.metadata.create_all(engine)
        # 连接非关系型数据库
        else:
            info=CONNECTION.split('/')
            # 连接mongodb数据库
            connection=MongoClient(
                info[0],
                int(info[1])
            )
            db=connection[info[2]]
            DBSession=db[info[3]]
        return DBSession
    # 定义映射类
    def tabel(self,tablename):
        class TempTable(Base):
            __tablename__=tablename
            id=Column(Integer,primary_key=True)
        # 对类属性进行判断,符合sqlalchemy的字段则定义到数据映射类
        for k,v in self.__dict__.items():
            if isinstance(v,Column):
                setattr(TempTable,k,v)
        return TempTable

    # 插入数据
    def insert(self,value):
        # 关系型数据库的数据插入
        if self.databaseType=='SQL':
            self.DBSession.execute(self.table.__table__.insert(),value)
            self.DBSession.commit()
        elif self.databaseType=='NoSQL':
            # 判断参数value的数据类型,选择单条数据还是多条数据插入
            if isinstance(value,list):
                self.DBSession.insert_maye(value)
            else:
                self.DBSession.insert(value)
    #更新数据
    def update(self,value,condition={}):
        # 关系型数据库的数据更新
        if self.databaseType=='SQL':
            # 单个条件更新
            if condition:
                r=self.table.__dict__[list(condition.keys())[0]].in_(list(condition.values()))
                self.DBSession.execute(self.table.__table__.update().where(c).values(),value)
            # 全表更新
            else:
                self.DBSession.execute(self.table.__table__.update().values(),value)
            self.DBSession.commit()
        elif self.databaseType=='NoSQL':
            self.DBSession.update(condition,{'$set':value})
    # 文件下载
    def getfile(self,content,filepath):
        with open(filepath,'wb') as f:
            f.write(content)

    # 写入csv文件
    def writeCSV(self,value,title=[]):
        # 参数title为空列表,则将字典的keys进行排序并作为csv的标题
        if not title:
            title=sorted(value[0].keys())
        # 判断文件是否存在
        pathExists=os.path.exists(self.path)
        with open(self.path,'a',newline='') as csv_file:
            csv_writer=csv.writer(csv_file)
            # 文件不存在
            if not pathExists:
                csv_writer.writerow(title)
            # 将数据写入csv文件
            for v in value:
                valueList=[]
                for t in title:
                    valueList.append(v[t])
                    csv_writer.writerow(valueList)

数据存储类DataStorage定义8个方法,分别是初始化方法__init__()、field()、connect()、table()、insert()、update()、getfile()和writeCSV(),代码内容比较多,我们来看一下每个方法实现的功能:
初始化方法__init__()根据参数databaseType来执行相应的数据存储方式,每种数据存储方式说明如下:

  1. ● 如果参数databaseType设为SQL,则说明数据存储方式为关系型数据库。初始化方法会从可选参数kwargs里获取参数tablename,如果参数tablename不存在,则由子类的名字作为数据表的表名;然后调用类方法field(),从类方法field()里获取自定义的字段属性,用于定义数据表映射类;再调用类方法table()来创建数据表映射类,并以类属性table表示;最后调用类方法connect()进行数据库连接,将数据库连接对象返回并以类属性DBSession表示。
  2. ● 如果参数databaseType设为NoSQL,则说明数据存储方式为非关系型数据库。初始化方法只调用类方法connect()并把参数CONNECTION传入,实现数据库连接,将数据库连接对象返回并以类属性DBSession表示。
  3. ● 如果参数databaseType设为CSV或没有设置参数databaseType,则说明数据存储方式为CSV文件存储。初始化方法将参数CONNECTION赋值给类属性path,类属性path代表CSV文件路径信息。
  4. 类方法field():让开发者自定义数据表字段,主要用于关系型数据库的存储方式。在使用过程中,通过子类继承数据存储类DataStorage,在子类里重写类方法field()即可实现自定义表字段。
  5. 类方法connect():根据参数databaseType来选择相应的数据库连接方式。如果使用关系型数据库,则使用SQLAlchemy框架实现数据库连接,反之则使用pymongo模块连接MongoDB。
  6. 类方法table():定义数据表映射类TempTable,映射类会默认创建主键ID,然后遍历数据存储类DataStorage的类属性,并对每个类属性的数据类型进行判断,如果类属性是Column对象(即SQLAlchemy的表字段对象),则使用Python内置方法setattr()将类属性写入数据表映射类。
  7. 类方法insert():实现数据入库功能,支持关系型和非关系型数据库的数据入库操作。插入的数据必须是字典格式,并且字典的key必须为表字段。参数value可以是列表或字典形式,若是以字典表示,则插入单条数据,若是以列表表示,则插入多条数据。
  8. 类方法update():实现数据更新功能,支持关系型和非关系型数据库的数据更新操作。参数value必须是字典格式,并且字典的key必须为表字段;参数condition是更新条件,它的默认值为None,如果参数值为None,则对全表数据进行更新处理,反之对符合条件的数据进行更新处理。
  9. 类方法getfile():实现文件下载功能,参数content代表文件内容;参数filepath代表文件所保存的绝对路径。
  10. 类方法writeCSV():实现CSV文件存储数据功能,参数title代表文件表头内容,如果参数值为空,则以参数value首个元素的keys作为表头内容,参数title以列表表示,列表元素决定了数据写入顺序;参数value是待存储的数据内容,也是以列表表示,每个列表元素是以字典表示。

未经允许不得转载:作者:鳄鱼君Ba, 转载或复制请以 超链接形式 并注明出处 鳄鱼君Ba
原文地址:《自己动手开发爬虫框架 数据清洗机制 数据存储机制》 发布于2020-07-01

分享到:
赞(0) 赏杯咖啡

评论 1

6 + 3 =
  1. #1

    贵站的网址就是我从爬来的!

文章对你有帮助可赏作者一杯咖啡

支付宝扫一扫打赏

微信扫一扫打赏

Vieu4.6主题
专业打造轻量级个人企业风格博客主题!专注于前端开发,全站响应式布局自适应模板。
切换注册

登录

忘记密码 ?

您也可以使用第三方帐号快捷登录

Q Q 登 录
微 博 登 录
切换登录

注册