使用pandas把mysql的数据导入MongoDB。

时间:2023-03-09 23:05:21
使用pandas把mysql的数据导入MongoDB。

使用pandas把mysql的数据导入MongoDB。

首先说下我的需求,我需要把mysql的70万条数据导入到mongodb并去重,

同时在第二列加入一个url字段,字段的值和第三列的值一样,代码如下:

# -*- coding: utf-8 -*-
# @Time : 2018/9/29 17:20
# @Author : cxa
# @File : run.py
# @Software: PyCharm
import pandas as pd
from sqlalchemy import create_engine
from pymongo import MongoClient
import json
import time class MongoBase:
def __init__(self, collection):
self.collection = collection
self.OpenDB() def read_mysql(self):
engine = create_engine(
'mysql+pymysql://usernmae:passwd@ip:port/dbname?charset=utf8') # 用sqlalchemy创建引擎
start=time.time()
max_id=self.get_max_id()
df1 = pd.read_sql(f'select primary_key,phone,plat_code,crawl_time,jrjt_del_dt from test_info where primary_key>{max_id}', engine) # 从数据库中读取表存为DataFrame
end = time.time()
print("查询完毕条数",len(df1['phone']),"用时",end-start)
df1.drop_duplicates('phone', keep='first', inplace=True)
df1.insert(1, 'url', df1['phone'])
return df1 def OpenDB(self):
self.con = MongoClient(host=host)
self.db = self.con[self.collection]
self.collection = self.db['test'] def closeDB(self):
self.con.close() def get_max_id(self):
max_id = self.collection.find().sort([('primary_key', -1)]).limit(1)[0]
if max_id:
return max_id.get("primary_key") if __name__ == '__main__':
start=time.time()
mongo = MongoBase('spider_data')
df =mongo.read_mysql()
mongo.collection.insert(json.loads(df.T.to_json()).values())
mongo.closeDB()
end=time.time()
print("运行完成所用时",end-start)