python操作mysql数据库读取一个数据库的表写入另一个数据库

时间:2023-03-09 17:47:11
python操作mysql数据库读取一个数据库的表写入另一个数据库

写这个肯定是工作需要了,不啰嗦,直接说事

我现在有两台主机,一台是公司主机,一台是客户主机,要求把公司主机上的三个表同步到客户主机上的数据库

注意是同步,首先就得考虑用linux定时任务或者主从复制,主从复制因为我没有权限在主机上设置,所以只能选择通过脚本,做定时任务

涉及的三个表创建语句

# 创建表`schedule_building`
create_sql_schedule_building = """
DROP table IF EXISTS schedule_building ;
CREATE TABLE `schedule_building` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`uuid` varchar(32) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
`proj_id` int(11) DEFAULT NULL,
`team_id` int(11) DEFAULT NULL,
`Building` varchar(32) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
`status` tinyint(4) DEFAULT '1',
`cid` int(11) DEFAULT NULL,
`cusrname` varchar(50) CHARACTER SET utf8 DEFAULT NULL COMMENT '建立人',
`ctime` datetime DEFAULT NULL,
`uid` int(11) DEFAULT NULL,
`uusrname` varchar(50) CHARACTER SET utf8 DEFAULT NULL COMMENT '修改人',
`utime` datetime DEFAULT NULL,
`random_no` varchar(50) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=91 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
""" # 创建表`schedule_floor`
create_sql_schedule_floor = """
DROP table IF EXISTS schedule_floor ;
CREATE TABLE `schedule_floor` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`m_id` int(11) DEFAULT NULL,
`sort` int(11) DEFAULT NULL,
`cname` varchar(100) CHARACTER SET utf8 DEFAULT NULL,
`ctime` datetime DEFAULT NULL,
`cid` int(11) DEFAULT NULL,
`utime` datetime DEFAULT NULL,
`uid` int(11) DEFAULT NULL,
`status` tinyint(4) DEFAULT '1',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3249 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci ;
""" # 创建表`schedule_room`
create_sql_schedule_room = """
DROP table IF EXISTS schedule_room ;
CREATE TABLE `schedule_room` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`m_id` int(11) DEFAULT NULL,
`ilevel` int(11) DEFAULT NULL,
`parent_id` int(11) DEFAULT NULL,
`cname` varchar(50) DEFAULT NULL,
`mark` varchar(50) DEFAULT NULL,
`status` tinyint(4) DEFAULT '1',
`sort` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=926 DEFAULT CHARSET=utf8;
"""

核心代码

from mysql_base import DataBaseParent_local, DataBaseParent_remote, DataBaseParent_test
import MySQLdb db1 = DataBaseParent_local()
db2 = DataBaseParent_remote()
db3 = DataBaseParent_test() def read(tb_name):
sql = "SELECT * FROM {0};".format(tb_name)
rows, length = db1.select(sql)
data = []
for row in rows:
data.append(row)
return data def write_building():
schedule_building = read("schedule_building")
sql_schedule_building_2 = "delete from schedule_building ;"
sql_schedule_building_3 = "insert into schedule_building values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);"
db3.insert_many(sql_schedule_building_3, sql_schedule_building_2, schedule_building) def write_floor():
schedule_floor = read("schedule_floor")
sql_schedule_floor_2 = "delete from schedule_floor ;"
sql_schedule_floor_1 = "insert into schedule_floor values(%s,%s,%s,%s,%s,%s,%s,%s,%s);"
db3.insert_many(sql_schedule_floor_1, sql_schedule_floor_2, schedule_floor) def write_room():
schedule_room = read("schedule_room")
sql_schedule_room_2 = "delete from schedule_room ;"
sql_schedule_room_1 = "insert into schedule_room values(%s,%s,%s,%s,%s,%s,%s,%s);"
db3.insert_many(sql_schedule_room_1, sql_schedule_room_2, schedule_room) if __name__ == '__main__':
write_floor()
write_building()
write_room()

数据库共享基类

#!/usr/bin/python
# -*- coding: UTF-8 -*-
"""DB共享类库"""
# 使用此类,先实例化一个DataBaseParent_local对象,然后对象调用相应方法
# from django.db import connection import MySQLdb
db1 = MySQLdb.connect("www.shdfshajd.cn", "db_user", "qazeDC!@12", "xcx", charset='utf8')
db3 = MySQLdb.connect("localhost", "root", "root", "apollo", charset='utf8') class DataBaseParent_local:
def __init__(self):
self.cursor = "Initial Status"
self.cursor = db1.cursor()
if self.cursor == "Initial Status":
raise Exception("Can't connect to Database server!") # 返回元组套元组数据
def select(self, sqlstr):
# result = (('apollo', 'male', '164.jpeg'), ('apollo', 'male', ''))
cur = db1.cursor()
cur.execute(sqlstr)
List = cur.fetchall()
iTotal_length = len(List)
self.description = cur.description
cur.close()
return List, iTotal_length # 返回列表套字典数据
def select_include_name(self, sqlstr):
# result = [{'name':'apollo','age':28},{'name':'jack','age':27}]
cur = db1.cursor()
cur.execute(sqlstr)
index = cur.description
List = cur.fetchall()
iTotal_length = len(List)
result = []
for res in List:
row = {}
for i in range(len(index) - 1):
row[index[i][0]] = res[i]
result.append(row)
cur.close()
return result, iTotal_length # 返回指定页码数据(元组套元组)
def select_for_grid(self, sqlstr, pageNo=1, select_size=5):
# List: (('apollo','male','28'),('jack','male','27'))
# iTotal_length: 查询结果元组的长度
# select_size:分页每页显示
# pageNo:页码
List, iTotal_length = self.select(sqlstr)
# 确定页码
if iTotal_length % select_size == 0:
iTotal_Page = iTotal_length / select_size
else:
iTotal_Page = iTotal_length / select_size + 1 start, end = (pageNo - 1) * select_size, pageNo * select_size
if end >= iTotal_length: end = iTotal_length
if iTotal_length == 0 or start > iTotal_length or start < 0:
return [], iTotal_length, iTotal_Page, pageNo, select_size
# 假设有10条数据,select_size=5,对应结果如下:
# List[start:end]:(('apollo','male','28'),('jack','male','27')) 10,2,
# iTotal_length:10
# iTotal_Page:2
# pageNo:1
# select_size:5
return List[start:end], iTotal_length, iTotal_Page, pageNo, select_size # 执行sql语句
def executesql(self, sqlstr):
cur = db1.cursor()
r = cur.execute(sqlstr)
db1.commit()
cur.close()
return r # 插入数据
def insert(self, sql, param):
cur = self.cursor
n = cur.execute(sql, param)
db1.commit()
cur.close()
return n def release(self):
return 0 class DataBaseParent_test:
def __init__(self):
self.cursor = "Initial Status"
self.cursor = db3.cursor()
if self.cursor == "Initial Status":
raise Exception("Can't connect to Database server!") # 返回元组套元组数据
def select(self, sqlstr):
# result = (('apollo', 'male', '164.jpeg'), ('apollo', 'male', ''))
cur = db3.cursor()
cur.execute(sqlstr)
List = cur.fetchall()
iTotal_length = len(List)
self.description = cur.description
cur.close()
return List, iTotal_length # 返回列表套字典数据
def select_include_name(self, sqlstr):
# result = [{'name':'apollo','age':28},{'name':'jack','age':27}]
cur = db3.cursor()
cur.execute(sqlstr)
index = cur.description
List = cur.fetchall()
iTotal_length = len(List)
result = []
for res in List:
row = {}
for i in range(len(index) - 1):
row[index[i][0]] = res[i]
result.append(row)
cur.close()
return result, iTotal_length # 返回指定页码数据(元组套元组)
def select_for_grid(self, sqlstr, pageNo=1, select_size=5):
# List: (('apollo','male','28'),('jack','male','27'))
# iTotal_length: 查询结果元组的长度
# select_size:分页每页显示
# pageNo:页码
List, iTotal_length = self.select(sqlstr)
# 确定页码
if iTotal_length % select_size == 0:
iTotal_Page = iTotal_length / select_size
else:
iTotal_Page = iTotal_length / select_size + 1 start, end = (pageNo - 1) * select_size, pageNo * select_size
if end >= iTotal_length: end = iTotal_length
if iTotal_length == 0 or start > iTotal_length or start < 0:
return [], iTotal_length, iTotal_Page, pageNo, select_size
# 假设有10条数据,select_size=5,对应结果如下:
# List[start:end]:(('apollo','male','28'),('jack','male','27')) 10,2,
# iTotal_length:10
# iTotal_Page:2
# pageNo:1
# select_size:5
return List[start:end], iTotal_length, iTotal_Page, pageNo, select_size # 执行sql语句
def executesql(self, sqlstr):
cur = db3.cursor()
r = cur.execute(sqlstr)
db1.commit()
cur.close()
return r # 插入数据
def insert(self, sql, param):
cur = self.cursor
n = cur.execute(sql, param)
db3.commit()
cur.close()
return n def release(self):
return 0 def insert_many(self, sql, sql1, args):
cur = self.cursor
cur.execute(sql1)
res = cur.executemany(sql, args)
db3.commit()
# cur.close()
return res