【Python之路Day12】网络篇之Python操作MySQL

时间:2023-03-09 02:59:48
【Python之路Day12】网络篇之Python操作MySQL

pymysql是Python中操作MySQL的模块,使用方法和MySQLDB几乎一样。

1. 执行SQL语句

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Author: DBQ(Du Baoqiang) import pymysql #导入模块 #创建连接
conn = pymysql.connect(
host='172.16.30.162', #主机IP
port=3306, #端口
user='tom', #连接数据库用户
password='tom123', #连接密码
db='db1' #连接的数据库名称
) #创建游标
cursor = conn.cursor() #执行SQL,并返回受影响的行数
effect_row = cursor.execute("UPDATE tb1 SET host='1.1.1.1'") # 执行SQL,并返回受影响行数
#effect_row = cursor.execute("UPDATE tb1 SET host='1.1.1.2' WHERE id > %s",(1,)) # 执行SQL,并返回受影响行数
#effect_row = cursor.executemany("INSERT INTO tb1(host) VALUES(%s),(%s)", [("1.1.1.11",1),("1.1.1.11",1)]) print(effect_row)
# 提交,不然无法保存新建或者修改的数据
conn.commit() # 关闭游标
cursor.close() # 关闭连接
conn.close()

2 获取查询数据

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Author: DBQ(Du Baoqiang) import pymysql #导入模块 #创建连接
conn = pymysql.connect(
host='172.16.30.162', #主机IP
port=3306, #端口
user='tom', #连接数据库用户
password='tom123', #连接密码
db='db1' #连接的数据库名称
) # #创建游标
cursor = conn.cursor() cursor.execute("SELECT * FROM tb1") #获取第一行数据
row_1 = cursor.fetchone() #获取前N行数据
row_2 = cursor.fetchmany(3) #获取所有数据
row_3 = cursor.fetchall() conn.commit()
cursor.close()
conn.close() print(row_1) #游标的位置会变的,获取了第一行之后,游标就到第二行位置了
print(row_2) #因此打印前三行的时候,是打印的2,3,4
print(row_3) #同理,打印所有的,实际上是当前游标到最后的位置 #代码执行结果:
(1, '1.1.1.1')
((2, '1.1.1.2'), (3, '1.1.1.2'), (4, '1.1.1.11'))
((5, ''), (6, '1.1.1.11'), (7, ''), (8, '1.1.1.11'), (9, ''), (10, '1.1.1.11'), (11, ''), (12, '1.1.1.30'), (13, '1.1.1.30'))

3、fetch数据类型

#!/usr/bin/env python3
# -*- coding: utf-8 -*- import pymysql #导入模块 #创建连接
conn = pymysql.connect(
host='172.16.30.162', #主机IP
port=3306, #端口
user='tom', #连接数据库用户
password='tom123', #连接密码
db='db1' #连接的数据库名称
) # #创建游标, 并设置为字典类型
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) cursor.execute("select * from tb1") result = cursor.fetchone() print(result) conn.commit()
cursor.close()
conn.close() #执行结果:
{'host': '1.1.1.1', 'id': 1}

SQLAlchemy

SQLAlchemy是Python编程语言下的一款ORM框架,该框架建立在数据库API之上,使用关系对象映射来操作数据库。简而言之,就是将对象转换为SQL语句,然后使用数据API执行SQL并获取执行结果。

SQLAlchemy本身无法操作数据库,其必须以pymysql等第三方插件。

Dialect用于和数据API交互,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,如下代码:

二. ORM功能的使用

1. 创建表

#!/usr/bin/env python3
# -*- coding: utf-8 -*- from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer,String, ForeignKey,UniqueConstraint,Index
from sqlalchemy.orm import sessionmaker,relationships
from sqlalchemy import create_engine engine = create_engine("mysql+pymysql://tom:tom123@172.16.30.162:3306/db1",max_overflow=5) Base = declarative_base() #创建一个单表
class Users(Base):
'''
一定要继承Base
'''
__tablename__ = 'users' #表名为users
id = Column(Integer,primary_key=True) #id列, 整数数据类型, 主键
name = Column(String(32)) #name列, 字符串类型, 长度32
extra = Column(String(20)) #extra列,字符串类型,长度20 __table_args__ = (
UniqueConstraint('id','name',name='unx_id_name'),
Index('ix_id_name','name','extra')
) #一对多
class Favor(Base):
__tablename__ = 'favor'
nid = Column(Integer,primary_key=True)
caption = Column(String(50),default='red',unique=True) class Person(Base):
'''
通过外键关联favor表的nid实现一对多
'''
__tablename__ = 'person'
nid = Column(Integer,primary_key=True)
name = Column(String(32),index=True,nullable=True)
favor_id = Column(Integer, ForeignKey('favor.nid')) #外键,关联favor表的nid #多对多
class Group(Base):
__tablename__ = 'group'
id = Column(Integer,primary_key=True)
name = Column(String(64),unique=True,nullable=True)
port = Column(Integer,default=22) class Server(Base):
__tablename__ = 'server'
id = Column(Integer,primary_key=True,autoincrement=True)
hostname = Column(String(64),unique=True,nullable=False) class ServerToGroup(Base):
'''
servertogroup这个表存放上述两个表的对应关系,可以多对多
'''
__tablename__ = 'servertogroup'
nid = Column(Integer,primary_key=True,autoincrement=True)
server_id = Column(Integer,ForeignKey('server.id'))
group_id = Column(Integer,ForeignKey('group.id')) def init_db():
Base.metadata.create_all(engine) def drop_db():
Base.metadata.drop_all(engine) #init_db() #创建所有表
#drop_db() #删除所有表

2  操作表

#!/usr/bin/env python3
# -*- coding: utf-8 -*- from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer,String, ForeignKey,UniqueConstraint,Index
from sqlalchemy.orm import sessionmaker,relationships
from sqlalchemy import create_engine engine = create_engine("mysql+pymysql://tom:tom123@172.16.30.162:3306/db1",max_overflow=5) Base = declarative_base() #创建一个单表
class Users(Base):
'''
一定要继承Base
'''
__tablename__ = 'users' #表名为users
id = Column(Integer,primary_key=True) #id列, 整数数据类型, 主键
name = Column(String(32)) #name列, 字符串类型, 长度32
extra = Column(String(20)) #extra列,字符串类型,长度20 __table_args__ = (
UniqueConstraint('id','name',name='unx_id_name'),
Index('ix_id_name','name','extra')
) def __repr__(self):
return "%s-%s"%(self.id, self.name) #一对多
class Favor(Base):
__tablename__ = 'favor'
nid = Column(Integer,primary_key=True)
caption = Column(String(50),default='red',unique=True) def __repr__(self):
return "%s-%s" %(self.nid, self.caption) class Person(Base):
'''
通过外键关联favor表的nid实现一对多
'''
__tablename__ = 'person'
nid = Column(Integer,primary_key=True)
name = Column(String(32),index=True,nullable=True)
favor_id = Column(Integer, ForeignKey('favor.nid')) #外键,关联favor表的nid
#与生成表结构无关, 仅用于查询方便
#favor = relationships('Favor',backref = 'pers') #多对多
class Group(Base):
__tablename__ = 'group'
id = Column(Integer,primary_key=True)
name = Column(String(64),unique=True,nullable=True)
port = Column(Integer,default=22)
#group = relationships('Group',secondary=ServerToGroup,backref='host_list') class Server(Base):
__tablename__ = 'server'
id = Column(Integer,primary_key=True,autoincrement=True)
hostname = Column(String(64),unique=True,nullable=False) class ServerToGroup(Base):
'''
servertogroup这个表存放上述两个表的对应关系,可以多对多
'''
__tablename__ = 'servertogroup'
nid = Column(Integer,primary_key=True,autoincrement=True)
server_id = Column(Integer,ForeignKey('server.id'))
group_id = Column(Integer,ForeignKey('group.id'))
#group = relationships('Group',backref='s2g')
#server = relationships('Server',backref='s2g') def init_db():
Base.metadata.create_all(engine) def drop_db():
Base.metadata.drop_all(engine) Session = sessionmaker(bind=engine)
session = Session() 表结构+连接数据库

3 增

obj = Users(name='Tom',extra='mouse')
session.add(obj)
session.add_all([
Users(name='Jerry',extra='Cat'),
Users(name='Sam',extra='Human')
]) session.commit()

session.query(Users).filter(Users.id > 4).delete()   #删除id>4的用户
session.commit()

session.query(Users).filter(Users.id > 3).update({"name":"sb"})   #先将id>3的用户名字改为sb
session.query(Users).filter(Users.id > 3).update({Users.name:Users.name + ''},synchronize_session = False) #而后在sb后拼接一个123 session.query(Users).filter(Users.id > 3).update({"id": Users.id+1},synchronize_session = "evaluate") #最后把大于3的id号+1 session.commit()

res = session.query(Users).all()  #返回一个列表
print(res)
res1 = session.query(Users.name, Users.extra).all() #返回一个列表,中包含元组
print(res1)
res2 = session.query(Users).filter_by(name='Jerry').all() #返回一个列表
print(res2)
res3 = session.query(Users).filter_by(name='Jerry').first()
print(res3)
session.commit()

其他

#条件
ret = session.query(Users).filter_by(name='Jerry').all()
ret = session.query(Users).filter(Users.id > 1, Users.name == 'Tom').all() #并
ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'Sam').all() #id在1-3之间,并且名字是Sam
ret = session.query(Users).filter(Users.id.in_([1,3,4])).all() #id在1,3,4里的
ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all() #取反
ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='Sam'))).all() #子查询 from sqlalchemy import and_, or_ ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'Tom')).all() #并
ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'Tom')).all() #或
ret = session.query(Users).filter(
or_(
Users.id < 2,
and_(Users.name == 'Sam', Users.id > 3),
Users.extra != ""
)).all() #id<2 或者 用户名是Sam并大于3的 # 通配符
ret = session.query(Users).filter(Users.name.like('J%')).all() #J开头后续任意字符
ret = session.query(Users).filter(~Users.name.like('J%')).all() #取反 # 限制
ret = session.query(Users)[0:5] #显示多少个值,个人感觉有点类似于列表的切片一样 # 排序
ret = session.query(Users).order_by(Users.id.desc()).all() #降序
ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all() #升序 # 分组
from sqlalchemy.sql import func ret = session.query(Users).group_by(Users.extra).all() ret = session.query(
func.max(Users.id),
func.sum(Users.id),
func.min(Users.id)).group_by(Users.name).all()
#结果:[(2, Decimal('2'), 2), (3, Decimal('3'), 3), (5, Decimal('5'), 5), (1, Decimal('1'), 1)] ret = session.query(
func.max(Users.id),
func.sum(Users.id),
func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all() #添加条件最小id大于2 # 连表 ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all() #连表查询, 条件 usersid 等于 Favor.nid
ret = session.query(Person).join(Favor).all() #左连接, 如果要用右连接,可以把两个表的位置换下
#
ret = session.query(Person).join(Favor, isouter=True).all() # 组合
q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union(q2).all() q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union_all(q2).all() print(ret)
session.commit()