Python-Day9 Paramiko模块/进程/线程/RabbitMQ队列

时间:2022-11-29 07:38:53

一、Paramiko模块

1.Paramiko安装

  Python的目录下有个Scripts目录,cd到这个目录用这里面的pip命令(如果添加的环境变量可以在cmd直接输入命令):pip install paramiko。如果pip版本低会有提示,python -m pip install --upgrade pip 升级pip,再次输入pip install paramiko。

Python-Day9 Paramiko模块/进程/线程/RabbitMQ队列

2.SSHClient

  用于连接远程服务器并执行基本命令

#基于用户名密码连接:
import paramiko
# 创建SSH对象
ssh = paramiko.SSHClient()
# 允许连接不在know_hosts文件中的主机
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接服务器
ssh.connect(hostname='192.168.1.122', port=22, username='root', password='')
# 执行命令
stdin, stdout, stderr = ssh.exec_command('ls')
# 获取命令结果
res,err = stdout.read(),stderr.read()
result = res if res else err
print(result.decode())
# 关闭连接
ssh.close()

SSHClient 封装 Transport

 import paramiko

 transport = paramiko.Transport(('192.168.1.122', 22))
transport.connect(username='root', password='') ssh = paramiko.SSHClient()
ssh._transport = transport stdin, stdout, stderr = ssh.exec_command('df')
print stdout.read() transport.close()

基于公钥密钥连接:

import paramiko

private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa')

# 创建SSH对象
ssh = paramiko.SSHClient()
# 允许连接不在know_hosts文件中的主机
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接服务器
ssh.connect(hostname='c1.salt.com', port=22, username='wupeiqi', key=private_key) # 执行命令
stdin, stdout, stderr = ssh.exec_command('df')
# 获取命令结果
result = stdout.read() # 关闭连接
ssh.close()
 import paramiko

 private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa')

 transport = paramiko.Transport(('hostname', 22))
transport.connect(username='wupeiqi', pkey=private_key) ssh = paramiko.SSHClient()
ssh._transport = transport stdin, stdout, stderr = ssh.exec_command('df') transport.close()

SSHClient 封装 Transport

3.SFTPClient

用于连接远程服务器并执行上传下载

import paramiko

transport = paramiko.Transport(('hostname',22))
transport.connect(username='wupeiqi',password='') sftp = paramiko.SFTPClient.from_transport(transport)
# 将location.py 上传至服务器 /tmp/test.py
sftp.put('/tmp/location.py', '/tmp/test.py')
# 将remove_path 下载到本地 local_path
sftp.get('remove_path', 'local_path') transport.close()
 import paramiko

 private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa')

 transport = paramiko.Transport(('hostname', 22))
transport.connect(username='wupeiqi', pkey=private_key ) sftp = paramiko.SFTPClient.from_transport(transport)
# 将location.py 上传至服务器 /tmp/test.py
sftp.put('/tmp/location.py', '/tmp/test.py')
# 将remove_path 下载到本地 local_path
sftp.get('remove_path', 'local_path') transport.close()

基于公钥密钥上传下载

二、进程与线程

1.进程与线程介绍和关系

  线程:线程是操作系统能够进行运算调度的最小单位吗,是一串指令的集和。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。

  进程:进程是具有一定独立功能的程序关于某个数据集合上的一次运行活动,进程是系统进行资源分配和调度的一个独立单位。

1)一个程序至少有一个进程,一个进程至少有一个线程

2)同一个进程下的线程共享内存空间,进程的内存是独立的

3)同一个进程的线程之间可以直接交流,两个进程想通信,必须通过一个中间代理来实现

4)创建新线程很简单, 创建新进程需要对其父进程进行一次克隆

5)一个线程可以控制和操作同一进程里的其他线程,但是进程只能操作子进程

无论你启多少个线程,你有多少个cpu, Python在执行的时候会淡定的在同一时刻只允许一个线程运行

2.Python threading模块

#直接调用
import threading
import time
def sayhi(num): #定义每个线程要运行的函数
print("running on number:%s" %num)
time.sleep(3) t1 = threading.Thread(target=sayhi,args=(1,)) #生成一个线程实例
t2 = threading.Thread(target=sayhi,args=(2,)) #生成另一个线程实例 t1.start() #启动线程
t2.start() #启动另一个线程

执行完会发现不是执行t1的print后  running on number:1  sleep 3秒再执行t2,而是t1和t2的print几乎同时出结果后

running on number:1

running on number:2

等待3s结束,下面继承式写了写了开始和结束的输出看着更明显一些。

#继承式调用
class MyThread(threading.Thread):
def __init__(self,n,sleep_time):
super(MyThread,self).__init__()
self.n = n
self.sleep_time = sleep_time
def run(self):
print("runnint task ",self.n )
time.sleep(self.sleep_time)
print("task done,",self.n ) t1 = MyThread("t1",2)
t2 = MyThread("t2",4) t1.start()
t2.start() '''
runnint task t1
runnint task t2
main thread....
task done, t1
task done, t2
'''

3.Join & Daemon

threading的join,一旦调用会等到调用它的线程执行完毕再执行其他的线程,上代码看结果

import threading
import time
def sayhi(num): #定义每个线程要运行的函数
print("running on number:%s" %num)
time.sleep(3)
print("%s睡完了" %num) t1 = threading.Thread(target=sayhi,args=(1,)) #生成一个线程实例
t2 = threading.Thread(target=sayhi,args=(2,)) #生成另一个线程实例 t1.start() #启动线程
t1.join() #-----------------------------等t1执行完----------------------------------
t2.start() #启动另一个线程 '''执行结果:
running on number:1
1睡完了
running on number:2
2睡完了
'''
import threading
import time
def sayhi(num): #定义每个线程要运行的函数
print("running on number:%s" %num)
time.sleep(3)
print("%s睡完了" %num) t1 = threading.Thread(target=sayhi,args=(1,)) #生成一个线程实例
t2 = threading.Thread(target=sayhi,args=(2,)) #生成另一个线程实例
t2.setDaemon(True) t1.start() #启动线程
t1.join() #等t1执行完
t2.start() #启动另一个线程 '''
running on number:1
1睡完了
running on number:2 #主进程执行完杀死子进程
'''

4.线程锁

  一个进程下可以启动多个线程,多个线程共享父进程的内存空间,也就意味着每个线程可以访问同一份数据,每个线程在要修改公共数据时,为了避免自己在还没改完的时候别人也来修改此数据,可以给这个数据加一把锁, 这样其它线程想修改此数据时就必须等待你修改完毕并把锁释放掉后才能再访问此数据。

import time
import threading def addNum():
global num #在每个线程中都获取这个全局变量
print('--get num:',num )
time.sleep(1)
lock.acquire() #修改数据前加锁
num -=1 #对此公共变量进行-1操作
lock.release() #修改后释放 num = 100 #设定一个共享变量
thread_list = []
lock = threading.Lock() #生成全局锁
for i in range(100):
t = threading.Thread(target=addNum)
t.start()
thread_list.append(t) for t in thread_list: #等待所有线程执行完毕
t.join() print('final num:', num )

5.GIL VS Lock

Python-Day9 Paramiko模块/进程/线程/RabbitMQ队列

RLock(递归锁):说白了就是在一个大锁中还要再包含子锁

import threading
def run1():
print("grab the first part data")
lock.acquire()
global num
num +=
lock.release()
return num
def run2():
print("grab the second part data")
lock.acquire()
global num2
num2 +=
lock.release()
return num2
def run3():
lock.acquire()
res = run1()
print('--------between run1 and run2-----')
res2 = run2()
lock.release()
print(res, res2)
num, num2 = ,
lock = threading.RLock()
for i in range():
t = threading.Thread(target=run3)
t.start()
while threading.active_count() != :
print(threading.active_count())
else:
print('----all threads done---')
print(num, num2)

6.Semaphore(信号量)

  互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去。

import threading, time
def run(n):
semaphore.acquire()
time.sleep(1)
print("run the thread: %s\n" % n)
semaphore.release()
if __name__ == '__main__':
semaphore = threading.BoundedSemaphore(5) # 最多允许5个线程同时运行
for i in range(22):
t = threading.Thread(target=run, args=(i,))
t.start()
while threading.active_count() != 1:
pass # print threading.active_count()
else:
print('----all threads done---')

7.Eents

  通过Event来实现两个或多个线程间的交互,下面是一个红绿灯的例子,即起动一个线程做交通指挥灯,生成几个线程做车辆,车辆行驶按红灯停,绿灯行的规则。

import time
import threading
event = threading.Event()
def lighter():
count = 0
event.set() #先设置绿灯
while True:
if count >5 and count < 10: #改成红灯
event.clear() #把标志位清了
print("\033[41;1mred light is on....\033[0m")
elif count >10:
event.set() #变绿灯
count = 0
else:
print("\033[42;1mgreen light is on....\033[0m")
time.sleep(1)
count +=1
def car(name):
while True:
if event.is_set(): #代表绿灯
print("[%s] running..."% name )
time.sleep(1)
else:
print("[%s] sees red light , waiting...." %name)
event.wait()
print("\033[34;1m[%s] green light is on, start going...\033[0m" %name)
light = threading.Thread(target=lighter,)
light.start()
car1 = threading.Thread(target=car,args=("Tesla",))
car1.start()

三、queue队列

class queue.Queue(maxsize=0) #先入先出

import queue


q = queue.Queue()


for i in range(5):
q.put(i)


while not q.empty():
print(q.get())

'''
0
1
2
3
4
'''
class queue.LifoQueue(maxsize=0) #last in fisrt out 
import queue

q = queue.LifoQueue()

for i in range(5):
q.put(i) while not q.empty():
print(q.get()) '''
4
3
2
1
0
'''
class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列
import queue

q = queue.PriorityQueue()

q.put((-1,"chen"))
q.put((3,"han"))
q.put((10,"alex"))
q.put((6,"wang")) print(q.get())
print(q.get())
print(q.get())
print(q.get()) '''

(-1, 'chen')
(3, 'han')
(6, 'wang')
(10, 'alex')

'''

四、生产者消费者模型

  在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

为什么要使用生产者和消费者模式

  在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

什么是生产者消费者模式

  生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

 import threading,time
import queue
q = queue.Queue(maxsize=10)
def Producer(name):
count = 1
while True:
q.put("骨头%s" % count)
print("生产了骨头",count)
count +=1
time.sleep(0.1)
def Consumer(name):
#while q.qsize()>0:
while True:
print("[%s] 取到[%s] 并且吃了它..." %(name, q.get()))
time.sleep(1)
p = threading.Thread(target=Producer,args=("Alex",))
c = threading.Thread(target=Consumer,args=("Cheng",))
c1 = threading.Thread(target=Consumer,args=("wang",))
p.start()
c.start()
c1.start()