Python MySQL ORM QuickORM hacking

时间:2022-01-21 23:36:12
# coding: utf-8

#
# Python MySQL ORM QuickORM hacking
# 说明:
# 以前仅仅是知道有ORM的存在,但是对ORM这个东西内部工作原理不是很清楚,
# 这次正好需要用到,于是解读一个相对来说很简单的Python2 ORM的例子。
#
# 参考源码:
# A simple ORM provides elegant API for Python-MySQL operation
# https://github.com/2shou/QuickORM
#
# 2016-10-15 深圳 南山平山村 曾剑锋 import MySQLdb #
# 作为和数据库中字段对应的域,同样也作为类属性存在
#
class Field(object):
pass class Expr(object):
# 合成where查询部分
def __init__(self, model, kwargs):
self.model = model
# How to deal with a non-dict parameter?
# 提取键值对的value部分
self.params = kwargs.values()
# 提取键值对的key部分,并合成替代字符串
equations = [key + ' = %s' for key in kwargs.keys()]
self.where_expr = 'where ' + ' and '.join(equations) if len(equations) > 0 else '' def update(self, **kwargs):
_keys = []
_params = []
# 筛选数据
for key, val in kwargs.iteritems():
if val is None or key not in self.model.fields:
continue
_keys.append(key)
_params.append(val) # 和__init__中的键值对的values数据联合
_params.extend(self.params)
# 合成查询语句
sql = 'update %s set %s %s;' % (
self.model.db_table, ', '.join([key + ' = %s' for key in _keys]), self.where_expr)
return Database.execute(sql, _params) def limit(self, rows, offset=None):
# 合成limit数据,这里就是合成想从那一行开始取数据,取多少数据
self.where_expr += ' limit %s%s' % (
'%s, ' % offset if offset is not None else '', rows)
return self def select(self):
# 合成查询语句,需要查询的字段,表明,条件
sql = 'select %s from %s %s;' % (', '.join(self.model.fields.keys()), self.model.db_table, self.where_expr)
# 取出所有的数据,这里使用了yield,使得select可以被for in语法再次迭代从而获取到值
for row in Database.execute(sql, self.params).fetchall():
# 获取传入的模板类型,这样就不用知道是什么类运行了select
inst = self.model()
# 获取一条信息中的值
for idx, f in enumerate(row):
setattr(inst, self.model.fields.keys()[idx], f)
yield inst # 返回查询的数据统计总数
def count(self):
sql = 'select count(*) from %s %s;' % (self.model.db_table, self.where_expr)
(row_cnt, ) = Database.execute(sql, self.params).fetchone()
return row_cnt class MetaModel(type):
db_table = None
fields = {} def __init__(cls, name, bases, attrs):
super(MetaModel, cls).__init__(name, bases, attrs)
fields = {}
# 从类所有的属性中提取出类属性,和数据库中的字段对应,这里更多的给Expr类使用。
for key, val in cls.__dict__.iteritems():
if isinstance(val, Field):
fields[key] = val
cls.fields = fields
cls.attrs = attrs class Model(object):
# 采用MetaModel来构建Model类
__metaclass__ = MetaModel # 动态生成对应的sql语句,并执行对应的语句,要注意这里是self.__dict__获取的实例属性。
def save(self):
insert = 'insert ignore into %s(%s) values (%s);' % (
self.db_table, ', '.join(self.__dict__.keys()), ', '.join(['%s'] * len(self.__dict__)))
return Database.execute(insert, self.__dict__.values()) # 使用where来查询
@classmethod
def where(cls, **kwargs):
return Expr(cls, kwargs) class Database(object):
autocommit = True
conn = None
db_config = {} # 通过db_config字典数据设置连接数据库的值
@classmethod
def connect(cls, **db_config):
cls.conn = MySQLdb.connect(host=db_config.get('host', 'localhost'), port=int(db_config.get('port', 3306)),
user=db_config.get('user', 'root'), passwd=db_config.get('password', ''),
db=db_config.get('database', 'test'), charset=db_config.get('charset', 'utf8'))
cls.conn.autocommit(cls.autocommit)
cls.db_config.update(db_config) # 这里是连接数据库,里面有一些策略,譬如:
# 1. 如果没有连接数据库,那么就连接数据库;
# 2. 如果连接了数据库,那么测试是否可ping通再返回连接;
# 3. 如果ping不通,那么重新连接,再返回。
@classmethod
def get_conn(cls):
if not cls.conn or not cls.conn.open:
cls.connect(**cls.db_config)
try:
cls.conn.ping()
except MySQLdb.OperationalError:
cls.connect(**cls.db_config)
return cls.conn # 这里是直接执行sql语句,返回的是执行后的cursor
@classmethod
def execute(cls, *args):
cursor = cls.get_conn().cursor()
cursor.execute(*args)
return cursor # 对象被垃圾回收机回收的时候调用
def __del__(self):
if self.conn and self.conn.open:
self.conn.close() # 执行原始sql语句
def execute_raw_sql(sql, params=None):
return Database.execute(sql, params) if params else Database.execute(sql)