ORM框架SQLAlchemy

时间:2022-06-07 03:00:25

一 、SQLAlchemy简介

SQLAlchemy是Python编程语言下的一款ORM框架,该框架建立在数据库API之上,使用关系对象映射进行数据库操作,简言之便是:将对象转换成SQL,然后使用数据API执行SQL并获取执行结果。
SQLAlchemy架构。

ORM框架SQLAlchemy

SQLAlchemy流程。
#1、使用者通过ORM对象提交命令
#2、将命令交给SQLAlchemy Core(Schema/Types  SQL Expression Language)转换成SQL
#3、使用 Engine/ConnectionPooling/Dialect 进行数据库操作
#3.1、匹配使用者事先配置好的egine
#3.2、egine从连接池中取出一个链接
#3.3、基于该链接通过Dialect调用DB API,将SQL转交给它去执行
!!!上述流程分析,可以大致分为两个阶段!!!:

#第一个阶段(流程1-2):将SQLAlchemy的对象换成可执行的sql语句
#第二个阶段(流程3):将sql语句交给数据库执行
如果我们不依赖于SQLAlchemy的转换而自己写好sql语句,那是不是意味着可以直接从第二个阶段开始执行了,事实上正是如此,我们完全可以只用SQLAlchemy执行纯sql语句,如下
from sqlalchemy import create_engine

#1 准备
# 需要事先安装好pymysql
# 需要事先创建好数据库:create database com charset utf8;

#2 创建引擎
engine=create_engine(mysql pymysql://adm:[email protected]:3306/com, echo=True)

#3 执行sql
# egine.execute(create table if not EXISTS t1(id int PRIMARY KEY auto_increment,name char(32));)

# cur=egine.execute(insert into t1 values(%s,%s);,[(1,"egon1"),(2,"egon2"),(3,"egon3")]) #按位置传值

# cur=egine.execute(insert into t1 values(%(id)s,%(name)s);,name=egon4,id=4) #按关键字传值

#4 新插入行的自增id
# print(cur.lastrowid)

#5 查询
cur=egine.execute(select * from t1)

cur.fetchone() #获取一行
cur.fetchmany(2) #获取多行
cur.fetchall() #获取所有行

DB API SQLAlchemy本身无法操作数据库,其必须以来pymsql等第三方插件,Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,如:
#1、MySQL-Python
    mysql mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
   
#2、pymysql
    mysql pymysql://<username>:<password>@<host>/<dbname>[?<options>]
   
#3、MySQL-Connector
    mysql mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
   
#4、cx_Oracle
    oracle cx_oracle://user:[email protected]:port/dbname[?key=value&key=value...]

更多详见:http://docs.sqlalchemy.org/en/latest/dialects/index.html 

 

二、ORM创建表结构

ORM框架SQLAlchemyORM框架SQLAlchemy
 1 #类==》表
 2 #对象==》表中的一行记录
 3 #四张表:业务线,服务,用户,角色,利用ORM创建出它们,并建立好它们直接的关系
 4 
 5 from sqlalchemy import create_engine
 6 from sqlalchemy.ext.declarative import declarative_base
 7 from sqlalchemy import Column,Integer,String,DateTime,Enum,ForeignKey,UniqueConstraint,ForeignKeyConstraint,Index
 8 from sqlalchemy.orm import sessionmaker
 9 
10 #连接数据库
11 egine = create_engine(mysql pymysql://adm:[email protected]:3306/com, echo=True)
12 
13 Base = declarative_base()
14 
15 #创建单表:业务线
16 class Bussiness(Base):
17     __tablename__ = bussiness
18     id=Column(Integer, primary_key=True, autoincrement=True)
19     bname=Column(String(32), nullable=False, index=True)
20 
21 
22 #多对一:多个服务器可以属于一个业务线,多个业务线不能包含同一个服务
23 class Service(Base):
24     __tablename__=service
25     id=Column(Integer, primary_key=True, autoincrement=True)
26     sname=Column(String(32), nullable=False, index=True)
27     ip=Column(String(15), nullable=False)
28     port=Column(Integer, nullable=False)
29 
30     bussiness_id=Column(Integer, ForeignKey(bussiness.id))  #创建一个外键
31 
32     __table_args__=(
33         UniqueConstraint(ip, port, name=uix_ip_port),
34         Index(ix_id_sname, id, sname)
35     )
36  
37  #一对一:一种角色只能管理一条业务线,一条业务线只能被一种角色管理
38 class Role(Base):
39      __tablename__=role
40      id=Column(Integer, primary_key=True, autoincrement=True)
41      rname=Column(String(32), nullable=False, index=True)
42      priv=Column(String(64), nullable=False)
43 
44      bussiness_id=Column(Integer, ForeignKey(bussiness.id),unique=True)
45 
46 #多对多:多个用户可以是同一个role,多个role可以包含同一个用户
47      
48 class Users(Base):
49     __tablename__=users
50     id=Column(Integer,primary_key=True, autoincrement=True)
51     uname=Column(String(32), nullable=False, index=True)
52 
53 class Users2Role(Base):
54     __tablename__=users2role
55     id=Column(Integer, primary_key=True, autoincrement=True)    
56     uid=Column(Integer, ForeignKey(users.id))
57     rid=Column(Integer, ForeignKey(role.id))
58 
59     __table_args__=(
60         UniqueConstraint(uid, rid, name=uix_uid_rid),
61     )
62 
63 def init_db():
64     Base.metadata.create_all(egine)
65 
66 def drop_db():
67     Base.metadata.drop_all(egine)
68 
69 
70 if __name__ == __main__:
71     init_db()        
ORM创建表结构

 

三、ORM实现MySQL数据库的增、删、改、查操作

#!/usr/bin/env python
# -*- encoding: utf-8 -*-
‘‘‘
@File    :   orm_pro.py
@Time    :   2020/01/07 22:11:59
@Author  :   yusheng_liang 
@Version :   1.0
‘‘‘
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column,Integer,String, ForeignKey
from sqlalchemy.orm import sessionmaker

engine = create_engine(mysql pymysql://adm:[email protected]:3306/com?charset=utf8,max_overflow=5)

Base = declarative_base()

# 一、创建两个表的项目结构
# 多对一:假如多个员工可以属于一个部门,而多个部门不能有同一个员工
class Dep(Base):
    __tablename__=dep
    id=Column(Integer, primary_key=True, autoincrement=True)
    dname=Column(String(32), index=True, nullable=False)

class Emp(Base):
    __tablename__=emp
    id=Column(Integer, primary_key=True, autoincrement=True)    
    ename=Column(String(32), nullable=False, index=True)
    dep_id=Column(Integer, ForeignKey(dep.id))

# 创建表结构的方法
def init_db():
    Base.metadata.create_all(engine)
# 删除表结构的方法
def drop_db():
    Base.metadata.drop_all(engine)

init_db()

Session=sessionmaker(bind=engine)
session=Session()

# 二、表结构创建完成后,可以对表进行增、删、改、查操作了


# 2.1增加数据的操作
row_obj=Dep(dname=销售部)   #按关键字传参,无需指定id,因id列是自动增长的
session.add(row_obj)
#也可以批量增加数据的操作
# session.add_all(
#     [
#         Dep(dname=人事部),
#         Dep(dname=财务部),
#         Dep(dname=技术部),
#     ]
# )

#提交数据,将数据插入到数据库中
session.commit()


# 2.2删除数据
# 删除Dep表中id大于3的数据记录
session.query(Dep).filter(Dep.id > 3).delete()
session.commit()


# 2.3修改数据
# 更新Dep表中id为1的dname列的值
session.query(Dep).filter(Dep.id == 1).update({dname:yusheng})
session.commit()


# 2.4查询数据
# 查询Dep表中的所有数据
ret = session.query(Dep).all()
# 查询Dep表中的dname列的数据,按id排序
res=session.query(Dep.dname).order_by(Dep.id).all()

result=session.query(Dep.dname).first()
print(result)  #yusheng

# 过渡查询数据,以逗号分隔,默认为and
results=session.query(Dep).filter(Dep.id >1, Dep.id < 1000)
print([(row.id, row.dname) for row in results])

四、ORM实现mysql数据库其他的操作

# 三、其他操作
# 3.1条件查询
sql=session.query(Emp).filter_by(ename=admin) #filter_by只能传参数,XOXX=XOXX
ret=sql.all() #sql语句执行查询的结果

res=session.query(Emp).filter(Emp.id > 1, Emp.ename == admin).all() #filter内传的是表达式,逗号分隔,默认为and,
res=session.query(Emp).filter(Emp.id.between(1,3), Emp.ename == admin).all()
res=session.query(Emp).filter(Emp.id.in_([1,3,55,78]), Emp.ename == admin).all()
res=session.query(Emp).filter(~Emp.id.in_([1,3,55,78]), Emp.ename == admin).all()  #~代表取反,转换成sql就是关键字not

from sqlalchemy import and_, or_
res=session.query(Emp).filter(and_(Emp.id > 1, Emp.ename == admin)).all()
res=session.query(Emp).filter(or_(Emp.id < 2, Emp.ename == admin)).all()
res=session.query(Emp).filter(or_(
    Emp.dep_id == 3,
    and_(Emp.id > 1, Emp.ename == admin),
    Emp.ename != ‘‘
)).all()

# 3.2通配符
res=session.query(Emp).filter(Emp.ename.like(%yu%)).all()
res.session.query(Emp).filter(~Emp.ename.like(%yu%)).all()

# 3.3limit
result=session.query(Emp)[0:5:2]

# 3.4排序
res=session.query(Emp).order_by(Emp.id.desc()).all()

# 3.5分组
from sqlalchemy.sql import func
res=session.query(Emp.dep_id).group_by(Emp.dep_id).all()
res=session.query(
    func.max(Emp.dep_id),
    func.min(Emp.dep_id),
    func.sum(Emp.dep_id),
    func.avg(Emp.dep_id),
    func.count(Emp.dep_id),
).group_by(Emp.dep_id).all()

res=session.query(
    Emp.dep_id,
    func.count(1),
).group_by(Emp.dep_id).having(func.count(1) > 2).all()


# 3.6连表操作
#笛卡尔积
res=session.query(Emp,Dep).all()  #select * from emp,dep;

# where条件
res=session.query(Emp, Dep).filter(Emp.dep_id == Dep.id).all()
# for row in res:
#     emp_tb=row[0]
#     dep_tb=row[1]
#     print(emp_tb.id,emp_tb.ename,dep_tb.id,dep_tb.dname)

# 内连接
ret=session.query(Emp).join(Dep)
#join默认为内连接,SQLAlchemy会自动帮我们通过foreign key字段去找关联关系
#但是上述查询的结果均为Emp表的字段,这样链表还有毛线意义,于是我们修改为
ret=session.query(Emp.id,Emp.ename,Emp.dep_id,Dep.dname).join(Dep).all()

# 左连接isouter=True
res=session.query(Emp.id,Emp.ename,Emp.dep_id,Dep.dname).join(Dep,isouter=True).all()
#右连接:同左连接,只是把两个表的位置换一下

# 3.7组合
q1=session.query(Emp.id,Emp.ename).filter(Emp.id > 0,Emp.id < 5)
q2=session.query(Emp.id,Emp.ename).filter(
    or_(
        Emp.ename.like(%大小%),
        Emp.ename.like(%上学%),
    )
)
res1=q1.union(q2) #组合 去重
res2=q1.union_all(q2) #组合,不去重

print([i.ename for i in q1.all()]) 
print([i.ename for i in q2.all()]) 
print([i.ename for i in res1.all()]) 
print([i.ename for i in res2.all()]) 


# 3.8子查询
# 有三种形式的子查询,注意:子查询的sql必须用括号包起来,尤其在形式三中需要注意这一点



#示例:查出id大于2的员工,当做子查询的表使用

#原生SQL:
# select * from (select * from emp where id > 2);

#ORM:
res=session.query(
    session.query(Emp).filter(Emp.id > 2).subquery()
).all()



#示例:#查出销售部门的员工姓名

#原生SQL:
# select ename from emp where dep_id in (select id from dep where dname=销售);

#ORM:
res=session.query(Emp.ename).filter(Emp.dep_id.in_(
    session.query(Dep.id).filter_by(dname=销售), #传的是参数
    # session.query(Dep.id).filter(Dep.dname==销售) #传的是表达式
)).all()



#示例:查询所有的员工姓名与部门名

#原生SQL:
# select ename as 员工姓名,(select dname from dep where id = emp.dep_id) as 部门名 from emp;

#ORM:
sub_sql=session.query(Dep.dname).filter(Dep.id==Emp.dep_id) #SELECT dep.dname FROM dep, emp WHERE dep.id = emp.dep_id
sub_sql.as_scalar() #as_scalar的功能就是把上面的sub_sql加上了括号

res=session.query(Emp.ename,sub_sql.as_scalar()).all()