python运维开发(十三)----SQLalchemy和paramiko续

时间:2023-03-09 08:09:28
python运维开发(十三)----SQLalchemy和paramiko续

内容目录:

ORM架构SQLalchemy

Paramiko

SQLalchemy对表的操作

使用 ORM/Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 所有组件对数据进行操作。根据类创建对象,对象转换成SQL,执行SQL。

1、创建表

# 单表
class Test(Base):
__tablename__ = 'test'
nid = Column(Integer, primary_key=True,autoincrement=True)
name = Column(String(32))
# 一对多
class Group(Base):
__tablename__ = 'group'
nid = Column(Integer, primary_key=True,autoincrement=True)
caption = Column(String(32)) class User(Base):
__tablename__ = 'user'
nid = Column(Integer, primary_key=True,autoincrement=True)
username = Column(String(32))
group_id = Column(Integer, ForeignKey('group.nid'))
group = relationship("Group", backref='uuu') def __repr__(self):
temp = "%s - %s: %s" %(self.nid, self.username, self.group_id)
return temp
#多对多
class Host(Base): # metaclass,Host.table对象
__tablename__ = 'host'
nid = Column(Integer, primary_key=True,autoincrement=True)
hostname = Column(String(32))
port = Column(String(32))
ip = Column(String(32)) # host_user = relationship('HostUser', secondary=HostToHostUser, backref='h')
host_user = relationship('HostUser', secondary=HostToHostUser.__table__, backref='h') class HostUser(Base):
__tablename__ = 'host_user'
nid = Column(Integer, primary_key=True,autoincrement=True)
username = Column(String(32)) def init_table():
Base.metadata.create_all(engine) def drop_table():
Base.metadata.drop_all(engine) init_table()
Session = sessionmaker(bind=engine)
session = Session()

2、操作表

增加操作

# 方法1
session.add(Group(caption='DBA'))
session.add(Group(caption='SA'))
session.commit()
# 方法2
session.add_all([
User(username='jabe1',group_id = 1),
User(username='jabe2',group_id = 2)
])
session.commit()

删除表数据操作

session.query(Users).filter(Users.id > 2).delete()
session.commit()

修改表记录操作  

session.query(Users).filter(Users.id > 2).update({"name" : "099"})
session.query(Users).filter(Users.id > 2).update({Users.name: Users.name + "099"}, synchronize_session=False)
session.query(Users).filter(Users.id > 2).update({"num": Users.num + 1}, synchronize_session="evaluate")
session.commit()

查询操作

#条件查询
ret = session.query(User).filter(User.username == 'jabe1').all()
print(ret) #单表查询
ret = session.query(User).all()
obj = ret[0]
print(ret)
print(obj)
print(obj.nid)
print(obj.username)
print(obj.group_id) #关联查询
sql = session.query(User).join(Group,isouter=True)
print(sql)
ret = session.query(User).join(Group,isouter=True).all()
print(ret)
# sql = session.query(User.username,Group.caption).join(Group,isouter=True)
# print(sql)
# ret = session.query(User.username,Group.caption).join(Group,isouter=True).all()
# print(ret) #原始方式(查询用户名称和用户组)
# ret = session.query(User.username,Group.caption).join(Group,isouter=True).all()
# print(ret)
#新方式正向查询
# ret = session.query(User).all()
# for obj in ret:
# #obj代指user表每一行数据
# #obj.group 代指group对象
# print(obj.nid,obj.username,obj.group_id,obj.group,obj.group.nid,obj.group.caption) #原始方式(查询所有属于DBA组的用户)
ret = session.query(User.username,Group.caption).join(Group,isouter=True).filter(Group.caption == 'DBA').all()
print(ret)
#新方式(反向查询)
obj = session.query(Group).filter(Group.caption == 'DBA').first()
print(obj.nid)
print(obj.caption)
rint(obj.uuu)  

附加

# 条件
ret = session.query(Users).filter_by(name='alex').all()
ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric').all()
ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all()
ret = session.query(Users).filter(Users.id.in_([1,3,4])).all()
ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all()
ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all()
from sqlalchemy import and_, or_
ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all()
ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all()
ret = session.query(Users).filter(
or_(
Users.id < 2,
and_(Users.name == 'eric', Users.id > 3),
Users.extra != ""
)).all() # 通配符
ret = session.query(Users).filter(Users.name.like('e%')).all()
ret = session.query(Users).filter(~Users.name.like('e%')).all() # 限制
ret = session.query(Users)[1:2] # 排序
ret = session.query(Users).order_by(Users.name.desc()).all()
ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all() # 分组
from sqlalchemy.sql import func ret = session.query(Users).group_by(Users.extra).all()
ret = session.query(
func.max(Users.id),
func.sum(Users.id),
func.min(Users.id)).group_by(Users.name).all() ret = session.query(
func.max(Users.id),
func.sum(Users.id),
func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all() # 连表 ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all() ret = session.query(Person).join(Favor).all() ret = session.query(Person).join(Favor, isouter=True).all() # 组合
q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union(q2).all() q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union_all(q2).all()

其他

  

Paramiko  

堡垒机

python运维开发(十三)----SQLalchemy和paramiko续

堡垒机执行流程:

  1. 管理员为用户在服务器上创建账号(将公钥放置服务器,或者使用用户名密码)
  2. 用户登陆堡垒机,输入堡垒机用户名密码,现实当前用户管理的服务器列表
  3. 用户选择服务器,并自动登陆
  4. 执行操作并同时将用户操作记录

注:配置.brashrc实现ssh登陆后自动执行脚本,如:/usr/bin/python /tmp/s13/day13/s7.py

练习代码:

import paramiko
import sys
import os
import socket
import getpass from paramiko.py3compat import u # windows does not have termios...
try:
import termios
import tty
has_termios = True
except ImportError:
has_termios = False def interactive_shell(chan):
if has_termios:
posix_shell(chan)
else:
windows_shell(chan) def posix_shell(chan):
import select oldtty = termios.tcgetattr(sys.stdin)
try:
tty.setraw(sys.stdin.fileno())
tty.setcbreak(sys.stdin.fileno())
chan.settimeout(0.0)
log = open('handle.log', 'a+', encoding='utf-8')
flag = False
temp_list = []
while True:
r, w, e = select.select([chan, sys.stdin], [], [])
if chan in r:
try:
x = u(chan.recv(1024))
if len(x) == 0:
sys.stdout.write('\r\n*** EOF\r\n')
break
if flag:
if x.startswith('\r\n'):
pass
else:
temp_list.append(x)
flag = False
sys.stdout.write(x)
sys.stdout.flush()
except socket.timeout:
pass
if sys.stdin in r:
x = sys.stdin.read(1)
import json if len(x) == 0:
break if x == '\t':
flag = True
else:
temp_list.append(x)
if x == '\r':
log.write(''.join(temp_list))
log.flush()
temp_list.clear()
chan.send(x) finally:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty) def windows_shell(chan):
import threading sys.stdout.write("Line-buffered terminal emulation. Press F6 or ^Z to send EOF.\r\n\r\n") def writeall(sock):
while True:
data = sock.recv(256)
if not data:
sys.stdout.write('\r\n*** EOF ***\r\n\r\n')
sys.stdout.flush()
break
sys.stdout.write(data)
sys.stdout.flush() writer = threading.Thread(target=writeall, args=(chan,))
writer.start() try:
while True:
d = sys.stdin.read(1)
if not d:
break
chan.send(d)
except EOFError:
# user hit ^Z or F6
pass def run():
default_username = getpass.getuser()
username = input('Username [%s]: ' % default_username)
if len(username) == 0:
username = default_username hostname = input('Hostname: ')
if len(hostname) == 0:
print('*** Hostname required.')
sys.exit(1) tran = paramiko.Transport((hostname, 22,))
tran.start_client() default_auth = "p"
auth = input('Auth by (p)assword or (r)sa key[%s] ' % default_auth)
if len(auth) == 0:
auth = default_auth if auth == 'r':
default_path = os.path.join(os.environ['HOME'], '.ssh', 'id_rsa')
path = input('RSA key [%s]: ' % default_path)
if len(path) == 0:
path = default_path
try:
key = paramiko.RSAKey.from_private_key_file(path)
except paramiko.PasswordRequiredException:
password = getpass.getpass('RSA key password: ')
key = paramiko.RSAKey.from_private_key_file(path, password)
tran.auth_publickey(username, key)
else:
pw = getpass.getpass('Password for %s@%s: ' % (username, hostname))
tran.auth_password(username, pw) # 打开一个通道
chan = tran.open_session()
# 获取一个终端
chan.get_pty()
# 激活器
chan.invoke_shell() interactive_shell(chan) chan.close()
tran.close() if __name__ == '__main__':
run()

参考url:http://www.cnblogs.com/wupeiqi/articles/5699254.html