自己动手开发爬虫框架 数据清洗机制 数据存储机制

鳄鱼君

发表文章数:643

热门标签

, ,

Vieu四代商业主题

高扩展、安全、稳定、响应式布局多功能模板。

¥69 现在购买
首页 » Python » 自己动手开发爬虫框架 数据清洗机制 数据存储机制

不管使用爬虫库还是爬虫框架,都可以将爬虫程序分为:数据爬取、数据清洗、数据入库。那么我们自己开发爬虫框架,也需要按照这个逻辑来实现。爬虫框架由4个文件组成:初始化文件__init__.py和功能文件pattern.py、spider.py、storage.py,文件说明如下:

  1. 初始化文件__init__.py用于设置框架的版本信息和导入框架的功能文件。
  2. 数据清洗文件pattern.py用于定义数据清洗类,清洗方式与Scrapy框架相似。
  3. 数据爬取文件spider.py用于定义数据爬取类,爬取方式支持异步并发、URL去重和分布式。
  4. 数据存储文件storage.py用于定义数据存储类,目前支持关系型数据库、非关系型数据库、CSV文件存储数据和文件下载功能。

我们将框架命名为myReptile,需要提前创建好各个文件模块

初始化文件__init__.py设置框架的版本信息及导入框架的功能文件,代码如下:

# project:myReptile
# author:eyujun
__version__='1.0.0'

# 导入功能模块
from .storage import *
from .spider import *
from .pattern import *

初始化文件是整个框架的入口,它导入了整个框架的功能。在使用框架的时候,只需在初始化文件调用相关的功能模块即可。

myReptile框架的设计原理:

  1. 数据爬取方式由URL地址的数据格式决定,如果URL地址的数据格式为列表,myReptile就会执行异步并发,并将所有请求的响应内容以列表格式返回;如果传入的URL地址是字符串格式(即单一的URL地址),myReptile就直接返回相应的响应内容;并且还支持URL去重和分布式爬虫功能。
  2. 数据清洗采用Scrapy框架的清洗模式,使用方式跟Scrapy框架差不多,目前仅支持CssSelector和Xpath定位方式。
  3. 数据入库支持关系型数据库、非关系型数据库和CSV文件存储,关系型数据库由SQLAlchemy框架实现;非关系型数据库目前仅支持MongoDB数据库。myReptile简化入库方式,只需将爬取的数据以字典格式传入即可实现入库操作。

异步爬取

myReptile框架的数据爬取由Aiohttp模块实现,因此它具备了异步并发功能。我们将Aiohttp模块的数据爬取功能进行封装和延伸,简化了其使用方式,使用者只需调用相关的函数并传入参数即可发送HTTP请求。打开spider.py文件,在文件里定义爬虫类Request,代码如下:

import aiohttp,asyncio,redis

# 设置默认参数
TIMEOUT=30
REQUEST_HEADERS={

}
# 实例化对象,用于发送HTTP请求
loop=asyncio.get_event_loop()

# 定义装饰器,实现URL去重或分布式处理
def distributes(func):
    def wrapper(self,url,**kwargs):
        redis_host=kwargs.get('redis_host','')
        if redis_host:
            port=kwargs.get('port',6379)
            redis_client=redis.StrictRedis(host=redis_host,port=port)
            redis_data_dict='keys'
            if not redis_client.hexists(redis_data_dict,url): # 判断url知否存在keys键中
                redis_client.hset(redis_data_dict,url,0)  # 不存在就会插入
                return func(self,url,**kwargs)
            else:
                return {}
        else:
            return func(self,url,**kwargs)
    return wrapper
# 定义爬虫类
class Request(object):
    # 定义异步函数
    async def httpGet(self,url,**kwargs):
        cookies=kwargs.get('cookies',{ })
        params=kwargs.get('params',{ })
        proxy=kwargs.get('proxy','')
        timeout=kwargs.get('timeout',TIMEOUT)
        headers=kwargs.get('headers',REQUEST_HEADERS)
        # 代理IP
        if proxy:
            async  with aiohttp.ClientSession(cookies=cookies) as session:
                async with session.get(url,
                                       params=params,
                                       proxy=proxy,
                                       timeout=timeout,
                                       headers=headers) as response:
                    # 将响应内容以字典格式fanhui
                    result= dict(
                        content=await response.read(),
                        text=await response.text(),
                        status=response.status,
                        headers=response.headers,
                        url=response.url
                    )
                    return result
        # 不使用代理ip
        else:
            async  with aiohttp.ClientSession(cookies=cookies) as session:
                async with session.get(url,
                                       params=params,
                                       timeout=timeout,
                                       headers=headers) as response:
                    # 将响应内容以字典格式fanhui
                    result= dict(
                        content=await response.read(),
                        text=await response.text(),
                        status=response.status,
                        headers=response.headers,
                        url=response.url
                    )
                    return result

    # 定义异步函数
    async def httpPost(self, url, **kwargs):
        cookies = kwargs.get('cookies', {})
        data = kwargs.get('data', {})
        proxy = kwargs.get('proxy', '')
        timeout = kwargs.get('timeout', TIMEOUT)
        headers = kwargs.get('headers', REQUEST_HEADERS)
        if proxy:
            async  with aiohttp.ClientSession(cookies=cookies) as session:
                async with session.post(url,
                                       data=data,
                                       proxy=proxy,
                                       timeout=timeout,
                                       headers=headers) as response:
                    # 将响应内容以字典格式fanhui
                    result = dict(
                        content=await response.read(),
                        text=await response.text(),
                        status=response.status,
                        headers=response.headers,
                        url=response.url
                    )
                    return result
        else:
            async  with aiohttp.ClientSession(cookies=cookies) as session:
                async with session.post(url,
                                       data=data,
                                       timeout=timeout,
                                       headers=headers) as response:
                    # 将响应内容以字典格式fanhui
                    result = dict(
                        content=await response.read(),
                        text=await response.text(),
                        status=response.status,
                        headers=response.headers,
                        url=response.url
                    )
                    return result

    # 定义GeT函数
    @distributes
    def get(self,url,**kwargs):
        print(url,kwargs)
        tasks=[]
        if isinstance(url,list):

            for i in url:
                task=asyncio.ensure_future(self.httpGet(i,**kwargs))
                tasks.append(task)
            result=loop.run_until_complete(asyncio.gather(*tasks))
        else:
            result=loop.run_until_complete(self.httpGet(url,**kwargs))
        return result
    # 定义Post函数
    @distributes
    def post(self, url, **kwargs):
        tasks = []
        if isinstance(url, list):
            for i in url:
                task = asyncio.ensure_future(self.httpPost(i, **kwargs))
                tasks.append(task)
            result = loop.run_until_complete(asyncio.gather(*tasks))
        else:
            result = loop.run_until_complete(self.httpPost(url, **kwargs))
        return result
# 实例化Request对象
request=Request()

爬虫Request类的代码有很多重复的,Aiohttp在使用的过程需要使用with模块,这避免不了!接下来我们需要测试一下代码是否正常。

rom spider import request
# Get请求
url='https://www.e1yu.com'
params={
    'myReptile':'spiderGet'
}
cookies={
    'myReptile':'spiderGet'
}
# URL去重分布式,设置redis数据库连接参数
redis_host='127.0.0.1'

res=request.get(url,params=params,cookies=cookies,redis_host=redis_host)
print(res)

# Post请求
url=['http://httpbin.org/post']
data={
    'myReptile':'spiderPost',
}
cookies={
    'myReptile':'spiderCookies'
}
res=request.post(url,data=data,cookies=cookies,redis_host=redis_host)
print(res)

myReptile框架回个局参数url的数据格式执行相应的请求处理,运行上面的代码都会输出对应请求的响应内容。运行代码记得开启redis-server服务,之后我们可以在redis-cli中查看记录的URL,这里不过多介绍!

数据清洗

myReptile框架的数据清洗主要使用BeautifulSoup和lxml(xpath)实现,接下来我们在pattern.py文件定义数据清洗类DataPattern,代码参考:

from sqlalchemy import *
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from pymongo import MongoClient
import csv
import os
Base=declarative_base()

# 数据存储类DataStorage
class DataStorage(object):
    def __init__(self,CONNECTION,**kwargs):
        self.databaseType=kwargs.get('databaseType','csv')
        # 根据参数databaseType选择存储方式,默认csv格式
        if self.databaseType=='SQL':
            # 根据字段创建映射类和数据表
            self.field()
            tablename = kwargs.get('tablename',self.__class__.__name__)
            self.table = self.table(tablename)
            self.DBSession = self.connect(CONNECTION)
        elif self.databaseType == 'NoSQL':
            self.DBSession = self.connect(CONNECTION)
        else:
            self.path=CONNECTION

    def field(self):
        """
        定义数据表字段
        :return:
        """
        # self.name=Column(String(50))
        pass

    def table(self, tablename):
        """
        定义映射类
        :param tablename:
        :return:
        """
        class TempTable(Base):
            # 表名
            __tablename__ = tablename
            # 字段
            id = Column(Integer, primary_key=True)

        # 对类属性进行判断,符合sqlalchemy的字段则定义到数据映射类
        for k, v in self.__dict__.items():
            if isinstance(v, Column):
                setattr(TempTable, k, v)
        return TempTable

    def connect(self,CONNECTION):
        """
        连接数据库
        :param CONNECTION:
        :return: DBSession对象
        """
        # 连接关系型数据库
        if self.databaseType == 'SQL':
            engine = create_engine(CONNECTION)
            DBSession = sessionmaker(bind=engine) # 创建会话对象
            Base.metadata.create_all(engine)  #创建数据表
        # 连接非关系型数据库
        else:
            info = CONNECTION.split('/') # 分割提取数据库连接信息
            # 连接mongodb数据库
            connection = MongoClient(
                info[0], # host
                int(info[1]) # port
            )
            db = connection[info[2]] # 数据库
            DBSession = db[info[3]] # 数据库表
        return DBSession

    # 插入数据
    def insert(self, value):
        # 关系型数据库的数据插入
        if self.databaseType == 'SQL':
            self.DBSession.execute(self.table.__table__.insert(), value)
            self.DBSession.commit()
        elif self.databaseType == 'NoSQL':
            # 判断参数value的数据类型,选择单条数据还是多条数据插入
            if isinstance(value, list):
                self.DBSession.insert_many(value)
            else:
                self.DBSession.insert(value)

    # 更新数据
    def update(self, value, condition={}):
        # 关系型数据库的数据更新
        if self.databaseType == 'SQL':
            # 单个条件更新
            if condition:
                r=self.table.__dict__[list(condition.keys())[0]].in_(list(condition.values()))
                self.DBSession.execute(self.table.__table__.update().where(r).values(), value)
            # 全表更新
            else:
                self.DBSession.execute(self.table.__table__.update().values(), value)
            self.DBSession.commit()
        elif self.databaseType=='NoSQL':
            self.DBSession.update(condition, {'$set': value})

    # 文件下载
    def getfile(self, content, filepath):
        with open(filepath, 'wb') as f:
            f.write(content)

    # 写入csv文件
    def writeCSV(self, value, title=[]):
        # 参数title为空列表,则将字典的keys进行排序并作为csv的标题
        if not title:
            title = sorted(value[0].keys())
            print(title)
        # 判断文件是否存在
        path_exists = os.path.exists(self.path)
        with open(self.path, 'a', newline='') as csv_file:
            # 将文件加入到csv对象
            csv_writer = csv.writer(csv_file)
            # 文件不存在
            if not path_exists:
                csv_writer.writerow(title)
            # 将数据写入csv文件
            for v in value:
                value_list = []
                for t in title:
                    value_list.append(v[t])
                csv_writer.writerow(value_list)

代码分析:

  1. 数据清洗类DataPattern定义了两个函数cssSelector()和xpath()
  2. 参数response代表HTTP的响应内容
  3. 参数selector代表目标数据的定位方法,定位方法采用CssSelector或Xpath语法
  4. 可选参数kwargs是自定义设置,参数parser可自定义HTML解析器,默认使用Python标准库解析器html.parser

接下来需要测试数据清洗类DataPattern是否运行正常,创建一个测试文件(位置随意),代码如下:

from pattern import dataPattern
from spider import request

url='https://www.e1yu.com'
headers={
    "User-Agent": "Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:77.0) Gecko/20100101 Firefox/77.0"
}
res=request.get(url,headers=headers)

# cssSelector提取
title=dataPattern.cssSelector(res['text'],'h2')
print(title)

# xpath提取
title=dataPattern.xpath(res['text'],'//h2')
print(title)

数据存储机制

myReptile框架的数据存储是采用SQLAlchemy框架pymongocsv模块实现的,分别提供了三种不同的数据存储方式,在使用过程中只需设置数据存储方式及调用相关方法即可实现数据存储处理。打开storage.py文件,在文件里定义数据存储类DataStorage,代码参考:

from sqlalchemy import *
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from pymongo import MongoClient
import csv
import os
Base=declarative_base()

# 数据存储类DataStorage
class DataStorage(object):
    def __init__(self,CONNECTION,**kwargs):
        self.databaseType=kwargs.get('databaseType','csv')
        # 根据参数databaseType选择存储方式,默认csv格式
        if self.databaseType=='SQL':
            # 根据字段创建映射类和数据表
            self.field()
            tablename=kwargs.get('tablename',self.__class__.__name__)
            self.table=self.table(tablename)
            self.DBSession=self.connect(CONNECTION)
        elif self.databaseType=='NoSQL':
            self.DBSession=self.connect(CONNECTION)
        else:
            self.path=CONNECTION


    def field(self):
        """
        定义数据表字段
        :return:
        """
        # self.name=Column(String(50))
        pass

    def table(self, tablename):
        """
        定义映射类
        :param tablename:
        :return:
        """
        class TempTable(Base):
            # 表名
            __tablename__ = tablename
            # 字段
            id = Column(Integer, primary_key=True)

        # 对类属性进行判断,符合sqlalchemy的字段则定义到数据映射类
        for k, v in self.__dict__.items():
            if isinstance(v, Column):
                setattr(TempTable, k, v)
        return TempTable

    def connect(self,CONNECTION):
        """
        连接数据库
        :param CONNECTION:
        :return: DBSession对象
        """
        # 连接关系型数据库
        if self.databaseType=='SQL':
            engine=create_engine(CONNECTION)
            DBSession=sessionmaker(bind=engine) # 创建会话对象
            Base.metadata.create_all(engine)  #创建数据表
        # 连接非关系型数据库
        else:
            info=CONNECTION.split('/') # 分割提取数据库连接信息
            # 连接mongodb数据库
            connection=MongoClient(
                info[0], # host
                int(info[1]) # port
            )
            db=connection[info[2]] # 数据库
            DBSession=db[info[3]] # 数据库表
        return DBSession


    # 插入数据
    def insert(self,value):
        # 关系型数据库的数据插入
        if self.databaseType=='SQL':
            self.DBSession.execute(self.table.__table__.insert(),value)
            self.DBSession.commit()
        elif self.databaseType=='NoSQL':
            # 判断参数value的数据类型,选择单条数据还是多条数据插入
            if isinstance(value,list):
                self.DBSession.insert_many(value)
            else:
                self.DBSession.insert(value)
    #更新数据
    def update(self,value,condition={}):
        # 关系型数据库的数据更新
        if self.databaseType=='SQL':
            # 单个条件更新
            if condition:
                r=self.table.__dict__[list(condition.keys())[0]].in_(list(condition.values()))
                self.DBSession.execute(self.table.__table__.update().where(c).values(),value)
            # 全表更新
            else:
                self.DBSession.execute(self.table.__table__.update().values(),value)
            self.DBSession.commit()
        elif self.databaseType=='NoSQL':
            self.DBSession.update(condition,{'$set':value})
    # 文件下载
    def getfile(self,content,filepath):
        with open(filepath,'wb') as f:
            f.write(content)

    # 写入csv文件
    def writeCSV(self,value,title=[]):
        # 参数title为空列表,则将字典的keys进行排序并作为csv的标题
        if not title:
            title=sorted(value[0].keys())
            print(title)
        # 判断文件是否存在
        pathExists=os.path.exists(self.path)
        with open(self.path,'a',newline='') as csv_file:
            csv_writer=csv.writer(csv_file) # 将文件加入到csv对象
            # 文件不存在
            if not pathExists:
                csv_writer.writerow(title)
            # 将数据写入csv文件
            for v in value:
                valueList=[]
                for t in title:
                    valueList.append(v[t])
                csv_writer.writerow(valueList)

数据存储类DataStorage定义8个方法,分别是初始化方法__init__()、field()、connect()、table()、insert()、update()、getfile()和writeCSV(),代码内容比较多。为了验证代码的个部分功能是否正常,需要分别创建测试代码进行测试,这里需要分别创建对应的文件夹。先测试最简单的csv文件,验证csv存储:

from storage import DataStorage

if __name__ == '__main__':
    CONNECTION='data.csv' # 存储路径
    # 实例化存储类
    database=DataStorage(CONNECTION)
    data=[
        {'name':'eyujun1','age':18,'city':'henan'},
        {'name':'eyujun2','age':18,'city':'henan'},
        {'name':'eyujun3','age':18,'city':'henan'},
    ]
    # 调用writeCSV方法写入数据
    #database.writeCSV(data,title=['name','age','city'])
    database.writeCSV(data)

接着验证nosql数据存储的代码

from storage import DataStorage

if __name__ == '__main__':
    CONNECTION='localhost/27017/test/data_db' # 存储参数 host port 数据库 数据库表
    # 实例化存储类
    database = DataStorage(CONNECTION,databaseType='NoSQL')
    data_many = [
        {'name': 'eyujun1', 'age': 18, 'city': 'henan'},
        {'name': 'eyujun2', 'age': 18, 'city': 'henan'},
        {'name': 'eyujun3', 'age': 18, 'city': 'henan'},
    ]
    data_one = {'name': 'eyujun4', 'age': 18, 'city': 'henan'}
    database.insert(data_many)
    database.insert(data_one)
    # 更新数据
    value={'name':'鳄鱼君'} #更新后的内容
    condition={'name': 'eyujun1', 'age': 18, 'city': 'henan'}   #更新的条件
    database.update(value,condition)

最后测试最难的SQL数据存储,需要先创建数据库spider

from storage import DataStorage
from sqlalchemy import Column,String

# 定义数据表student
class Student(DataStorage):
    def field(self):
        # 定义数据表字段
        self.name=Column(String(50),comment='姓名')
        self.age=Column(String(50),comment='年龄')
        self.city=Column(String(50),comment='城市')
class School(DataStorage):
    def field(self):
        self.name = Column(String(50), comment='姓名')
        self.school = Column(String(50), comment='学校')


if __name__ == '__main__':
    CONNECTION = 'mysql+pymysql://root:123@localhost/spider?charset=utf8mb4'  # 存储路径
    student=Student(CONNECTION,databaseType='SQL') # 学生表
    school=School(CONNECTION,databaseType='SQL') #学校表

    # 插入数据
    data_many = [
        {'name': 'eyujun1', 'age': 18, 'city': 'henan'},
        {'name': 'eyujun2', 'age': 18, 'city': 'henan'},
        {'name': 'eyujun3', 'age': 18, 'city': 'henan'},
    ]
    data_one = {'name': 'eyujun1', 'school':'清华大学'}
    student.insert(data_many)
    school.insert(data_one)

    # 数据更新
    condition={'id':1}
    value={'name': 'eyujun1', 'age': 18, 'city': 'henan'}
    student.update(value,condition)

未经允许不得转载:作者:鳄鱼君, 转载或复制请以 超链接形式 并注明出处 鳄鱼君
原文地址:《自己动手开发爬虫框架 数据清洗机制 数据存储机制》 发布于2020-07-01

分享到:
赞(0) 赏杯咖啡

评论 1

7 + 1 =
  1. #1

    贵站的网址就是我从爬来的!

    杜老师说5个月前 (07-01)回复

文章对你有帮助可赏作者一杯咖啡

支付宝扫一扫打赏

微信扫一扫打赏

Vieu4.6主题
专业打造轻量级个人企业风格博客主题!专注于前端开发,全站响应式布局自适应模板。
切换注册

登录

忘记密码 ?

您也可以使用第三方帐号快捷登录

Q Q 登 录
微 博 登 录
切换登录

注册