Python爬虫几个步骤教你写入mysql数据库

Python爬虫几个步骤教你写入mysql数据库

Python爬虫实现爬取网站中的数据并存入MySQL数据库中,在爬取的时候总要涉及到数据持久化存储,当然有很多中存储的方式,简单点的有excel、txt、json、csv等等。存入mysql我觉的有好多操作空间,如果是开发python后端也可以熟悉一下sql语句,存入数据库的方法也是试了些许网上一些方法,现在把完整功能供大家参考。

直接搜索 phpStudy安装即可,按照下图配置数据库。用户名密码自行设置,然后返回首页启动即可。

pip install pymysql

打开刚安装的phpstudy安装一个mysql客户端连接,数据库是本地的host可以填 127.0.0.1 或 localhost用户名密码是上面设置的

MySQL创建对应的表

CREATE TABLE `text_archives`  (
  `id` int(10) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT 'ID',
  `url` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '链接',
  `title` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '标题',
  `image` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '图片',
  `keywords` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '关键描述',
  `description` varchar(600) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '内容描述',
  `content` text CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL COMMENT '内容',
  `weigh` int(10) NOT NULL DEFAULT 0 COMMENT '权重',
  `createtime` bigint(16) NOT NULL DEFAULT 0 COMMENT '创建时间',
  `updatetime` bigint(16) NOT NULL DEFAULT 0 COMMENT '更新时间',
  `deletetime` bigint(16) NULL DEFAULT NULL COMMENT '删除时间',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 2692 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_unicode_ci COMMENT = '内容表' ROW_FORMAT = Dynamic;

SET FOREIGN_KEY_CHECKS = 1;

构造 SQL 语句的字符串 sql ,然后通过 cursor.excute(sql) 执行,下面简单的封装,直接复制即可用。

import pymysql


class Mysql(object):
    def __init__(self):
        self._connect = pymysql.connect(
            host='127.0.0.1',
            user='test',
            password='######',
            database='test',
            charset='utf8mb4',
            cursorclass=pymysql.cursors.DictCursor
        )
        self._cursor = self._connect.cursor()

    def inset_db(self, table_name, insert_data):
        try:
            data = self.get_mysql_data(data=insert_data)
            fields = data[0]
            values = data[1]
            sql = "INSERT INTO {table_name}({fields}) values ({values})".format(table_name=table_name, fields=fields,
                                                                                values=values)
            self._cursor.execute(sql)
            self._connect.commit()
        except Exception as e:
            self._connect.rollback()  # 如果这里是执行的执行存储过程的sql命令,那么可能会存在rollback的情况,所以这里应该考虑到
            print("数据插入失败,失败原因:", e)
            print(insert_data)
        else:
            # self.db_close()
            return self._cursor.lastrowid

    def update_db(self, table_name, update_data, wheres=None):
        try:
            if wheres is not None:
                sql = "UPDATE {table_name} SET {update_data} WHERE {wheres}".format(
                    table_name=table_name,
                    update_data=update_data,
                    wheres=wheres
                )
            else:
                sql = "UPDATE {table_name} SET {update_data}".format(
                    table_name=table_name,
                    update_data=update_data)
            self._cursor.execute(sql)
            self._connect.commit()
        except Exception as e:
            print("更新失败:", e)
            return False
        else:
            # self.db_close()
            return True

    def delete_db(self, table_name, wheres):
        try:
            # 构建sql语句
            sql = "DELETE FROM {table_name} WHERE {wheres}".format(table_name=table_name, wheres=wheres)
            self._cursor.execute(sql)
            self._connect.commit()
        except Exception as e:
            print('删除失败:', e)
            return False
        else:
            # self.db_close()
            return True

    def select_db(self, table_name, fields, wheres=None, get_one=False):
        try:
            if wheres is not None:
                sql = "SELECT {fields} FROM {table_name} WHERE {wheres}".format(
                    fields=fields,
                    table_name=table_name,
                    wheres=wheres
                )
            else:
                sql = "SELECT {fields} FROM {table_name}".format(fields=fields, table_name=table_name)
            self._cursor.execute(sql)
            self._connect.commit()
            if get_one:
                result = self._cursor.fetchone()
            else:
                result = self._cursor.fetchall()
        except Exception as e:
            print("查询失败", e)
            return None
        else:
            # self.db_close()
            return result

    def get_mysql_data(self, data):
        fields = ""
        insert_data = ""
        for k, v in data.items():
            fields = fields + k + ','
            insert_data = insert_data + "'" + str(v) + "'" + ','
        fields = fields.strip(',')
        insert_data = insert_data.strip(',')
        return [fields, insert_data]

    def db_close(self):
        self._cursor.close()
        self._connect.close()

这次简单点咱们用xpath就行,有一个小技巧咱们在爬取的网页打开开发都模式F12.如下图红框复制第一个或都第二个就行。

下面代码是实现爬取数据然后存入数据库类,大家可参考

from model.nav import Nav
import requests
from urllib import parse
from lxml import etree
from fake_useragent import UserAgent
from lib.reptile import Reptile
import json


class Common(object):
    def __init__(self, params):
        self.url = params['url']
        self.params = params
        self.blog = 1
  
    def get_header(self):
        ua = UserAgent()
        headers = {
            'User-Agent': ua.random
        }
        return headers

    def get_html(self, url):
        # 在超时间内,对于失败页面尝试请求三次
        if self.blog <= 3:
            try:
                res = requests.get(url=url, headers=self.get_header(), timeout=3)
                res.encoding = res.apparent_encoding
                html = res.text
                return html
            except Exception as e:
                print(e)
                self.blog += 1
                self.get_html(url)
    def json_insert_data(self, params):
      category_id = self.insert_category(cname=params['category_name'], pid=params['pid'], icon='')
      print("分类插入成功:{}".format(params['category_name']))
      if category_id:
          url = params['url']
          title = params['title']
          image = params['image']
          description = params['description']
          keywords = params['keywords']
          content = params['content']
          self.insert_archives(category_id, url, title, image, description, keywords, content)
          print("内容插入成功:{}".format(title))
          print("------------------------------------------------------------")
    def get_item(self, xpath_html):
        item_list = xpath_html.xpath(self.params['item_xpath'])
        print(item_list)
        for row in item_list:
            url_list = row.xpath(self.params['url_xpath'])
            if len(url_list) > 0:
                self.get_content(url_list[0])

    def get_content(self, url):
        print("正在抓取链接:{}".format(url))
        domain = parse.urlparse(url).netloc
        d_domain = parse.urlparse(self.url).netloc
        if domain == d_domain:
            html = self.get_html(url)
            self.reptile.blog = 1
            if html:
                p = etree.HTML(html)
                title = self.get_conmon_content(p, self.params['title_xpath'])
                print("标题为:{}".format(title))
                category_name = self.get_conmon_content(p, self.params['category_xpath'])
                print("分类为:{}".format(category_name))
                image = self.get_conmon_content(p, self.params['image_xpath'])
                print("图片为:{}".format(image))
                link = self.get_conmon_content(p, self.params['link_xpaht'])
                print("链接为:{}".format(link))
                description = self.get_conmon_content(p, self.params['description_xpath'])
                print("描述为:{}".format(description))
                keywords = self.get_conmon_content(p, self.params['keywords_xpath'])
                print("关键描述:{}".format(keywords))
                content = self.get_conmon_content(p, self.params['content_xpath'])
                print("内容为:{}".format(content))
                params = {
                    "pid": 158,
                    "title": title,
                    "category_name": category_name,
                    "image": image,
                    'url': link,
                    'description': description,
                    'keywords': keywords,
                    'content': content,
                }
                if title and category_name and link:
                    self.json_insert_data(params)#存入数据库

    def get_conmon_content(self, xpath_html, xpath):
        content_list = xpath_html.xpath(xpath)
        content = ''
        if len(content_list) > 0:
            content = content_list[0].strip()
        return content

    def run(self):
        print("url:{}".format(self.url))
        html = self.get_html(self.url)
        if html:
            p = etree.HTML(html)
            self.get_item(p)

#爬取的xpath
params = {
    "url": "https://www.widiz.com/", #爬取url
    "url_xpath": './/a[1]/@href',
    "title_xpath": '/html/body/p[1]/p[2]/p[3]/p/p[3]/p/h1/text()',
    "category_xpath": '/html/body/p[1]/p[2]/p[3]/p/p[3]/p/a[1]/text()',
    "image_xpath": '/html/body/p[1]/p[2]/p[3]/p/p[2]/p/img/@src',
    "link_xpaht": '/html/body/p[1]/p[2]/p[3]/p/p[3]/p/p/p[1]/span/a/@href',
    "description_xpath": '/html/head/meta[10]/@content',
    "keywords_xpath": '/html/head/meta[5]/@content',
    "content_xpath": '/html/body/p[1]/p[2]/p[3]/main/p[1]/p/p[1]/p/p[2]/text()'
}

Common(params).run()

最终效果:


展开阅读全文

页面更新:2024-05-22

标签:爬虫   数据库   语句   用户名   密码   关键   链接   标题   内容   数据

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号

Top