Python的平凡之路(9)

时间:2023-03-08 18:21:51

一、Paramiko模块练习

1、 Paramiko模块介绍
Paramiko是用python语言写的一个模块,遵循SSH2协议,支持以加密和认证的方式,进行远程服务器的连接
2 、SSHclient 用于远程连接服务器并执行基本命令
a   用户名密码方式
#!/usr/bin/env python
#Author is wspikh
# -*- coding: encoding -*-
import paramiko
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(hostname="192.168.16.101",port=22,username="wspkh", password="abcABC123")
#执行命令
stdin,stdout,stderr = ssh.exec_command('ifconfig')
#返回结果
result = stdout.read()
print(result.decode())
#关闭连接
ssh.close()
b   公钥方式
import paramiko
private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa')
# 创建SSH对象
ssh = paramiko.SSHClient()
# 允许连接不在know_hosts文件中的主机
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接服务器
ssh.connect(hostname='c1.salt.com', port=22, username='wupeiqi', key=private_key)
# 执行命令
stdin, stdout, stderr = ssh.exec_command('df')
# 获取命令结果
result = stdout.read()
# 关闭连接
ssh.close()
3 、SFTPclient用于远程连接服务器并执行上传下载
a   用户名密码方式
#!/usr/bin/env python
#Author is wspikh
# -*- coding: encoding -*-
import paramiko
#transport练习
transport = paramiko.Transport(("192.168.16.101",22))
transport.connect(username = "wspkh", password = "abcABC123")
sftp = paramiko.SFTPClient.from_transport(transport)
#上传
sftp.put('/Users/khwsp/k.sh','/tmp/p.sh')
stdin,stdout,stderr = ssh.exec_command('ls /tmp')
result = stdout.read()
print(result.decode())
#下载
sftp.get('/tmp/ct.shutdown','/Users/khwsp/kt.shutdown')
#关闭连接
transport.close()
b   公钥方式 
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import paramiko
import uuid

class Haproxy(object):

def __init__(self):
        self.host = '172.16.103.191'
        self.port = 22
        self.username = 'wupeiqi'
        self.pwd = '123'
        self.__k = None

def create_file(self):
        file_name = str(uuid.uuid4())
        with open(file_name,'w') as f:
            f.write('sb')

        return file_name
    #写法大赞

def run(self):
        self.connect()
        self.upload()
        self.rename()
        self.close()

def connect(self):
        transport = paramiko.Transport((self.host,self.port))
        transport.connect(username=self.username,password=self.pwd)
        self.__transport = transport

def close(self):

self.__transport.close()

def upload(self):
        # 连接,上传
        file_name = self.create_file()

sftp = paramiko.SFTPClient.from_transport(self.__transport)
        # 将location.py 上传至服务器 /tmp/test.py
        sftp.put(file_name, '/home/wupeiqi/tttttttttttt.py')

def rename(self):

ssh = paramiko.SSHClient()
        ssh._transport = self.__transport
        # 执行命令
        stdin, stdout, stderr = ssh.exec_command('mv /home/wupeiqi/tttttttttttt.py /home/wupeiqi/ooooooooo.py')
        # 获取命令结果
        result = stdout.read()

ha = Haproxy()
ha.run()

 二、线程

1、定义、特点、语法

定义: 线程可以理解成是在进程中独立运行的子任务。比如,QQ.exe运行时就有很多的子任务在同时运行。再如,好友视频线程、下载文件线程、传输数据线程、发送表情线程等,这些不同的任务或者说功能都可以同时运行,其中每一项任务完全可以理解成是“线程”在工作,传文件、听音乐、发送图片表情等功能都有对应的线程在后台默默地运行。

特点: 多线程技术,可以在同一时间内运行更多不同种类的任务。

语法:

直接调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import threading
import time
def sayhi(num): #定义每个线程要运行的函数
    print("running on number:%s" %num)
    time.sleep(3)
if __name__ == '__main__':
    t1 = threading.Thread(target=sayhi,args=(1,)) #生成一个线程实例
    t2 = threading.Thread(target=sayhi,args=(2,)) #生成另一个线程实例
    t1.start() #启动线程
    t2.start() #启动另一个线程
    print(t1.getName()) #获取线程名
    print(t2.getName())

继承式调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import threading
import time
class MyThread(threading.Thread):
    def __init__(self,num):
        threading.Thread.__init__(self)
        self.num = num
    def run(self):#定义每个线程要运行的函数
        print("running on number:%s" %self.num)
        time.sleep(3)
if __name__ == '__main__':
    t1 = MyThread(1)
    t2 = MyThread(2)
    t1.start()
    t2.start()

2、join

#!/usr/bin/env python

#Author is wspikh

# -*- coding: encoding -*-

import threading

import time

def run(n):

print("task",n)

time.sleep(2)

print("task done",n)

#开始时间

start_time = time.time()

t_objs = []

for i in range(50):

t = threading.Thread(target=run,args=("t%s" %i,))

t.start()

t_objs.append(t)#为了不阻塞后面线程的启动,不在这里join,先放到一个空列表里

for t in t_objs: #循环线程实例列表,等待所有线程执行完毕

t.join() #其实就是wait的意思

#结束时间,计算花费了多长时间

print("------All threads has finished!",":",threading.current_thread(),":",threading.active_count())
print("cost:",time.time()-start_time)

3、线程锁之Lock\Rlock\信号量

lock.py

#!/usr/bin/env python
#Author is wspikh
# -*- coding: encoding -*-
import threading
import time
def run(n):
lock.acquire() #获得一把锁
global num
time.sleep(2)#sleep不占用CPU
num += 1
time.sleep(1) #加锁后变串行
print(time.time())
lock.release() lock = threading.Lock()
num = 0
t_objs = []
for i in range(20):
t = threading.Thread(target=run,args=("t%s" %i,))
t.start()
t_objs.append(t)#为了不阻塞后面线程的启动,不在这里join,先放到一个空列表里 print("------All threads has finished!------",":",threading.current_thread(),":",threading.active_count())
time.sleep(3)
print("num:",num) 递归锁.py
#!/usr/bin/env python
#Author is wspikh
# -*- coding: encoding -*-
import threading, time def run1():
print("grab the first part data")
lock.acquire()
global num
num += 1
lock.release()
return num def run2():
print("grab the second part data")
lock.acquire()
global num2
num2 += 1
lock.release()
return num2 def run3():
lock.acquire()
res = run1()
print('--------between run1 and run2-----')
res2 = run2()
lock.release()
print(res, res2) if __name__ == '__main__': num, num2 = 0, 0
lock = threading.RLock()#递归锁
for i in range(5):
t = threading.Thread(target=run3)
t.start() while threading.active_count() != 1:
print(threading.active_count())
else:
print('----all threads done---')
print(num, num2)
信号量.py
#!/usr/bin/env python
#Author is wspikh
# -*- coding: encoding -*-
import threading, time #互斥锁同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去。
def run(n):
semaphore.acquire()#加锁
time.sleep(1)
print("run the thread: %s\n" % n)
semaphore.release()#释放锁 if __name__ == '__main__': num = 0 #全局变量
semaphore = threading.BoundedSemaphore(5) #最多允许5个线程同时运行
for i in range(20):
t = threading.Thread(target=run, args=(i,))
t.start() while threading.active_count() != 1:
pass # print threading.active_count()
else:
print('----all threads done---')
 

4、将线程变为守护进程

#!/usr/bin/env python
#Author is wspikh
# -*- coding: encoding -*-
import threading
import time
def run(n):
print("task",n)
time.sleep(2)
print("task done",n)
#开始时间
start_time = time.time() t_objs = []#存线程实例
for i in range(50):
t = threading.Thread(target=run,args=("t%s" %i,))
t.setDaemon(True)#设置守护线程(非守护线程退出,也就意味者退出,不关心守护线程)
t.start()
t_objs.append(t)#为了不阻塞后面线程的启动,不在这里join,先放到一个空列表里 #for t in t_objs: #循环线程实例列表,等待所有线程执行完毕
# t.join() #其实就是wait的意思 #结束时间,计算花费了多长时间
print("------All threads has finished!------",":",threading.current_thread(),":",threading.active_count())
print("cost:",time.time()-start_time)

5、Event事件 

event_t.py

#!/usr/bin/env python
#Author is wspikh
# -*- coding: encoding -*-
import time
import threading #设置事件
event = threading.Event() #定义灯
def lighter():
count = 0
event.set() #先设置为绿灯
while True:
if count > 5 and count < 11: #改成红灯
event.clear() #清除标志位
print("\033[41;1mred light is on .....\033[0m")
elif count > 11:
event.set() #变绿灯
count = 0
else:
print("\033[42;1m green light is on .....\033[0m")
time.sleep(1)
count += 1
#定义车
def car(name):
while True:
if event.is_set():#判断标志位是否设置(标志位代表绿灯)
print("[%s] is running...." %name)
time.sleep(1)
else:
print("[%s] sees red light ,waiting" %name)
event.wait()
print("\033[34;1m green light is on,start gong....\033[0m")
#设置一个线程
light = threading.Thread(target=lighter,)
light.start() #设置一个线程
car1 = threading.Thread(target=car,args=("Tesla",))
car1.start()

6、Queue队列

Queue是python标准库中的线程安全的队列(FIFO)实现,提供了一个适用于多线程编程的先进先出的数据结构,即队列,用来在生产者和消费者线程之间的信息传递

基本FIFO队列

class Queue.Queue(maxsize=0)

FIFO即First in First Out,先进先出。Queue提供了一个基本的FIFO容器,使用方法很简单,maxsize是个整数,指明了队列中能存放的数据个数的上限。一旦达到上限,插入会导致阻塞,直到队列中的数据被消费掉。如果maxsize小于或者等于0,队列大小没有限制。

举个栗子:

import Queue

q = Queue.Queue()

for i in range(5):
q.put(i) while not q.empty():
print q.get()

输出:

0
1
2
3
4

LIFO队列

class Queue.LifoQueue(maxsize=0)

LIFO即Last in First Out,后进先出。与栈的类似,使用也很简单,maxsize用法同上

再举个栗子:

import Queue

q = Queue.LifoQueue()

for i in range(5):
q.put(i) while not q.empty():
print q.get()

输出:

4
3
2
1
0

可以看到仅仅是将Queue.Quenu类替换为Queue.LifiQueue类

优先级队列

class Queue.PriorityQueue(maxsize=0)

构造一个优先队列。maxsize用法同上。

import Queue
import threading class Job(object):
def __init__(self, priority, description):
self.priority = priority
self.description = description
print 'Job:',description
return
def __cmp__(self, other):
return cmp(self.priority, other.priority) q = Queue.PriorityQueue() q.put(Job(3, 'level 3 job'))
q.put(Job(10, 'level 10 job'))
q.put(Job(1, 'level 1 job')) def process_job(q):
while True:
next_job = q.get()
print 'for:', next_job.description
q.task_done() workers = [threading.Thread(target=process_job, args=(q,)),
threading.Thread(target=process_job, args=(q,))
] for w in workers:
w.setDaemon(True)
w.start() q.join()

结果

Job: level 3 job
Job: level 10 job
Job: level 1 job
for: level 1 job
for: level 3 job
for: job: level 10 job

一些常用方法

task_done()

意味着之前入队的一个任务已经完成。由队列的消费者线程调用。每一个get()调用得到一个任务,接下来的task_done()调用告诉队列该任务已经处理完毕。

如果当前一个join()正在阻塞,它将在队列中的所有任务都处理完时恢复执行(即每一个由put()调用入队的任务都有一个对应的task_done()调用)。

join()

阻塞调用线程,直到队列中的所有任务被处理掉。

只要有数据被加入队列,未完成的任务数就会增加。当消费者线程调用task_done()(意味着有消费者取得任务并完成任务),未完成的任务数就会减少。当未完成的任务数降到0,join()解除阻塞。

put(item[, block[, timeout]])

将item放入队列中。

  1. 如果可选的参数block为True且timeout为空对象(默认的情况,阻塞调用,无超时)。
  2. 如果timeout是个正整数,阻塞调用进程最多timeout秒,如果一直无空空间可用,抛出Full异常(带超时的阻塞调用)。
  3. 如果block为False,如果有空闲空间可用将数据放入队列,否则立即抛出Full异常

其非阻塞版本为put_nowait等同于put(item, False)

get([block[, timeout]])

从队列中移除并返回一个数据。block跟timeout参数同put方法

其非阻塞方法为`get_nowait()`相当与get(False)

empty()

如果队列为空,返回True,反之返回False

7、生产者消费者模型

#!/usr/bin/env python
#Author is wspikh
# -*- coding: encoding -*-
import threading
import queue #生产者
def producer():
for i in range(10):
q.put("骨头 %s" % i )
print("开始等待所有的骨头被取走...")
q.join() #wait
print("所有的骨头被取完了...") #消费者
def consumer(n):
while q.qsize() >0:
print("%s 取到" %n , q.get())
q.task_done() #告知这个任务执行完了 q = queue.Queue()
p = threading.Thread(target=producer,)
p.start()
c1 = consumer("陈荣华") #实例化消费者  

三、进程

定义: 进程是正在运行的程序

特点: 结构特征、动态性、并发性、独立性、异步性

1 多进程
多进程.py
#!/usr/bin/env python
#Author is wspikh
# -*- coding: encoding -*-
import multiprocessing
import time
import threading
import os

def thread_run():
    s = time.time()
    print('Main process:%s' %os.getppid())
    print('Process:%s' %os.getpid())
    print("耗时: %s" %(time.time()-s))

def run(name):
    time.sleep(2)
    print('hello', name)
    t = threading.Thread(target=thread_run(),)
    t.start()

if __name__ == '__main__':
    for i in range(10):
        p = multiprocessing.Process(target=run, args=('bob %s' %i,))
        p.start()
        p.join()

2   进程间通信(不同进程间内存是不共享的,要想实现两个进程间的数据交换)  
(a)    queues
process-queues.py
#!/usr/bin/env python
#Author is wspikh
# -*- coding: encoding -*-
#此模块中已经包括了Queue,Process方法
from multiprocessing import Queue,Process
def f(qq):
    qq.put([42,None,'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f,args=(q,))
    p.start()
    print(q.get())
    p.join()

(b)    pipes
process-pipes.py
#!/usr/bin/env python
#Author is wspikh
# -*- coding: encoding -*-
#此模块中已经包括了Precess、Pipe方法
from multiprocessing import Process, Pipe
def f(conn):
    #发送2次
    conn.send([42, None, 'hello from child'])
    conn.send([42, None, 'hello from child2'])
    #接收
    print("from parent:",conn.recv())    conn.close()
if __name__ == '__main__':
    #管道两头分别是parent_conn,child_conn,两头可互换
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    #接受2次
    print(parent_conn.recv())  # prints "[42, None, 'hello']"
    print(parent_conn.recv())  # prints "[42, None, 'hello']"
    #发送
    parent_conn.send("张洋可好") # prints "[42, None, 'hello']"
    p.join()
(c)    manager
process-manager.py
#!/usr/bin/env python
#Author is wspikh
# -*- coding: encoding -*-
from multiprocessing import Process, Manager
import os

def f(d, l):
    d[os.getpid()]=os.getpid()
    l.append(os.getpid())
    print(l)

if __name__ == '__main__':
    with Manager() as manager: #等同于 manager=Manager()
        d = manager.dict() #创建一个字典,可以在多进程间传递和共享
        l = manager.list(range(5))#创建一个列表,可以在多进程间传递和共享
        #创建一个空列表
        p_list = []
        for i in range(10):
            #实例化一个子进程
            p = Process(target=f, args=(d, l))
            p.start()
            #把对象放到空列表
            p_list.append(p)
        for res in p_list:
            res.join() #wait

print(d)

(d)  lock
process-lock.py
#!/usr/bin/env python
#Author is wspikh
# -*- coding: encoding -*-
from multiprocessing import Process, Lock
def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()
if __name__ == '__main__':
    lock = Lock()
    for num in range(10):
        Process(target=f, args=(lock, num)).start()
3 进程池(进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。 )

【进程池中有两个方法:apply  和  apply_async】

process-pool.py
#!/usr/bin/env python
#Author is wspikh
# -*- coding: encoding -*-
from multiprocessing import Process,Pool
import time
import os

def Foo(i):
    time.sleep(2)
    print("in process",os.getpid())
    return i + 1

def Bar(arg):
    print('--->exec done:',arg,os.getpid())

if __name__ == '__main__':
    pool = Pool(processes=5) #允许进程池同时放入5个进程
    print("主进程",os.getpid())

for i in range(10):
        #实例化进程池,callback回调,执行完Foo函数,再执行Bar函数
        pool.apply_async(func=Foo,args=(i,),callback=Bar)
    #一定要先关闭掉进程池再关闭
    pool.close()
    pool.join()
    print("End") 

四、线程和进程的区别、联系

1、启动一个线程快

2、线程共享内存空间,进程的内存是独立的。

3、同一个进程的线程之间可以互相交流

4、两个进程想通信,必须通过一个中间代理来实现。
5、创建新线程很简单,创建新进程需要对其父进程进行一个克隆。
6、一个线程可以控制和操作同一进程里的其他线程,但是进程只能操作子进程。