python淘宝网页爬虫数据保存到 csv和mysql(selenium)

时间:2024-03-12 19:43:17

 数据库连接设置(表和字段要提前在数据库中建好)

# 数据库中要插入的表
MYSQL_TABLE = 'goods'

# MySQL 数据库连接配置,根据自己的本地数据库修改
db_config = {
    'host': 'localhost',
    'port': 3306,
    'user': 'root',
    'password': 'ma*****6',
    'database': 'may2024',
    'charset': 'utf8mb4',
}

# 创建 MySQL 连接对象
conn = pymysql.connect(**db_config)
cursor = conn.cursor()

全局设置

execl_save = 'zp2024_info.csv'

 库导入

from pyquery import PyQuery as pq

获取网页信息(前提是:已进入搜索结果页面)

# 获取每一页的商品信息;
def get_goods(num):
    # 获取商品前固定等待2-4秒
    # random_sleep(2, 4)
    product_c = []
    clear_mysql_table(MYSQL_TABLE)
    clear_csv_file(execl_save)
    driver = driver_configure.GetDriver().get_driver()
    next_page_xpath = '//*[@id="sortBarWrap"]/div[1]/div[2]/div[2]/div[8]/div/button[2]'
    for n in range(num):
        html = driver.page_source
        doc = pq(html)
        # 提取所有商品的共同父元素的类选择器
        items = doc(
            'div.PageContent--contentWrap--mep7AEm > div.LeftLay--leftWrap--xBQipVc > div.LeftLay--leftContent--AMmPNfB > div.Content--content--sgSCZ12 > div > div').items()

        for item in items:
            # 定位商品标题
            title = item.find('.Title--title--jCOPvpf span').text()
            # 定位价格
            price_int = item.find('.Price--priceInt--ZlsSi_M').text()
            price_float = item.find('.Price--priceFloat--h2RR0RK').text()
            if price_int and price_float:
                price = float(f"{price_int}{price_float}")
            else:
                price = 0.0
            # 定位交易量
            deal = item.find('.Price--realSales--FhTZc7U').text()
            # 转化万为数字
            if '万' in deal:
                deal_n = deal.split("万")[0]
                deal = int(10000 * int(deal_n))
            elif '+' in deal:
                deal = int(deal.split("+")[0])  # 10+人收货/10+人付款
            elif '付款' in deal:
                deal = int(deal.split("人")[0])  # 10人付款
            # 定位所在地信息
            location = item.find('.Price--procity--_7Vt3mX').text()
            if ' ' in location:
                province = location.split(" ")[0]
                city = location.split(" ")[1]
            else:
                province = location
                city = location
            # 定位店名
            shop = item.find('.ShopInfo--TextAndPic--yH0AZfx a').text()
            # 定位包邮的位置
            postText = item.find('.SalesPoint--subIconWrapper--s6vanNY span').text()
            result = 1 if "包邮" in postText else 0

            # 构建商品信息字典
            product = {
                'title': title,
                'price': price,
                'deal': deal,
                'location': location,
                'province': province,
                'city': city,
                'shop': shop,
                'isPostFree': result
            }
            product_c.append(product)
            # print(product)
            save_to_mysql(product)

        if n < num-1:
            print("点击下一页,至第{}页".format(n + 2))
            baseClass.BaseClass().click_element(next_page_xpath)  # 点击 下一页
            baseClass.BaseClass().scroll_to_end()  # 滚动到底部
    # print(product_c)
    save_to_csv(product_c)


# 在 save_to_mysql 函数中保存数据到 MySQL
def save_to_mysql(result):
    try:
        sql = "INSERT INTO {}(price, deal, title, shop, location, province, city, isPostFree) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)".format(MYSQL_TABLE)
        # print("sql语句为:  " + sql)
        cursor.execute(sql, (result['price'], result['deal'], result['title'], result['shop'], result['location'], result['province'], result['city'], result['isPostFree']))
        conn.commit()
        # print('存储到MySQL成功: ', result)
    except Exception as e:
        print('存储到MYsql出错: ', result, e)


# 清空mysql某个表数据
def clear_mysql_table(table_name):
    truncate_sql = f'TRUNCATE TABLE {table_name}'
    cursor.execute(truncate_sql)
    conn.commit()
    # close_cursor_conn()


# 关闭数据库游标和连接
def close_cursor_conn():
    cursor.close()
    conn.close()


# 将商品数据保存在csv
def save_to_csv(result):
    # df = pd.DataFrame(result, columns=['title', 'price', 'deal', 'location', 'province', 'city', 'shop', 'isPostFree'])
    df = pd.DataFrame(result)
    df.to_csv(execl_save, index=False, encoding='gbk')


# 清空 csv数据
def clear_csv_file(f):
    with open(f, 'w') as file:
        file.truncate(0)

部分封装

# 滑动页面到底部
    def scroll_to_end(self):
        for i in range(3):
            self.driver.execute_script('window.scrollTo(0,document.body.scrollHeight)')
            time.sleep(2)
# 查找可点击元素并点击
    def click_element(self, element):
        try:
            submit = WebDriverWait(self.driver, 15).until(EC.element_to_be_clickable((By.XPATH, element)))
            submit.click()
        except TimeoutError:
            print("翻页超时报错")
        rand_num = random.randint(5, 15)  # 设置随机等待的时间
        time.sleep(rand_num)