一、Paramiko模块练习
#Author is wspikh
# -*- coding: encoding -*-
import paramiko
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
result = stdout.read()
ssh.close()
import
paramiko
private_key
=
paramiko.RSAKey.from_private_key_file(
'/home/auto/.ssh/id_rsa'
)
# 创建SSH对象
ssh
=
paramiko.SSHClient()
# 允许连接不在know_hosts文件中的主机
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接服务器
# 执行命令
stdin, stdout, stderr
=
ssh.exec_command(
'df'
)
# 获取命令结果
result
=
stdout.read()
# 关闭连接
ssh.close()
#Author is wspikh
# -*- coding: encoding -*-
transport = paramiko.Transport(("192.168.16.101",22))
transport.connect(username = "wspkh", password = "abcABC123")
sftp.put('/Users/khwsp/k.sh','/tmp/p.sh')
stdin,stdout,stderr = ssh.exec_command('ls /tmp')
result = stdout.read()
# -*- coding:utf-8 -*-
import paramiko
import uuid
class Haproxy(object):
def __init__(self):
self.host = '172.16.103.191'
self.port = 22
self.username = 'wupeiqi'
self.pwd = '123'
self.__k = None
def create_file(self):
file_name = str(uuid.uuid4())
with open(file_name,'w') as f:
f.write('sb')
def run(self):
self.connect()
self.upload()
self.rename()
self.close()
def connect(self):
transport = paramiko.Transport((self.host,self.port))
transport.connect(username=self.username,password=self.pwd)
self.__transport = transport
def close(self):
self.__transport.close()
def upload(self):
# 连接,上传
file_name = self.create_file()
sftp = paramiko.SFTPClient.from_transport(self.__transport)
# 将location.py 上传至服务器 /tmp/test.py
sftp.put(file_name, '/home/wupeiqi/tttttttttttt.py')
def rename(self):
ssh = paramiko.SSHClient()
ssh._transport = self.__transport
# 执行命令
stdin, stdout, stderr = ssh.exec_command('mv /home/wupeiqi/tttttttttttt.py /home/wupeiqi/ooooooooo.py')
# 获取命令结果
result = stdout.read()
ha = Haproxy()
ha.run()
1、定义、特点、语法
定义: 线程可以理解成是在进程中独立运行的子任务。比如,QQ.exe运行时就有很多的子任务在同时运行。再如,好友视频线程、下载文件线程、传输数据线程、发送表情线程等,这些不同的任务或者说功能都可以同时运行,其中每一项任务完全可以理解成是“线程”在工作,传文件、听音乐、发送图片表情等功能都有对应的线程在后台默默地运行。
特点: 多线程技术,可以在同一时间内运行更多不同种类的任务。
语法:
直接调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
import threading
import time
def sayhi(num): #定义每个线程要运行的函数
print ( "running on number:%s" % num)
time.sleep( 3 )
if __name__ = = '__main__' :
t1 = threading.Thread(target = sayhi,args = ( 1 ,)) #生成一个线程实例
t2 = threading.Thread(target = sayhi,args = ( 2 ,)) #生成另一个线程实例
t1.start() #启动线程
t2.start() #启动另一个线程
print (t1.getName()) #获取线程名
print (t2.getName())
|
继承式调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
import threading
import time
class MyThread(threading.Thread):
def __init__( self ,num):
threading.Thread.__init__( self )
self .num = num
def run( self ): #定义每个线程要运行的函数
print ( "running on number:%s" % self .num)
time.sleep( 3 )
if __name__ = = '__main__' :
t1 = MyThread( 1 )
t2 = MyThread( 2 )
t1.start()
t2.start()
|
2、join
#!/usr/bin/env python
#Author is wspikh
# -*- coding: encoding -*-
import threading
import time
def run(n):
print("task",n)
time.sleep(2)
print("task done",n)
#开始时间
start_time = time.time()
t_objs = []
for i in range(50):
t = threading.Thread(target=run,args=("t%s" %i,))
t.start()
t_objs.append(t)#为了不阻塞后面线程的启动,不在这里join,先放到一个空列表里
for t in t_objs: #循环线程实例列表,等待所有线程执行完毕
t.join() #其实就是wait的意思
#结束时间,计算花费了多长时间
print("------All threads has finished!",":",threading.current_thread(),":",threading.active_count())
print("cost:",time.time()-start_time)
3、线程锁之Lock\Rlock\信号量
lock.py
#!/usr/bin/env python
#Author is wspikh
# -*- coding: encoding -*-
import threading
import time
def run(n):
lock.acquire() #获得一把锁
global num
time.sleep(2)#sleep不占用CPU
num += 1
time.sleep(1) #加锁后变串行
print(time.time())
lock.release() lock = threading.Lock()
num = 0
t_objs = []
for i in range(20):
t = threading.Thread(target=run,args=("t%s" %i,))
t.start()
t_objs.append(t)#为了不阻塞后面线程的启动,不在这里join,先放到一个空列表里 print("------All threads has finished!------",":",threading.current_thread(),":",threading.active_count())
time.sleep(3)
print("num:",num) 递归锁.py
#!/usr/bin/env python
#Author is wspikh
# -*- coding: encoding -*-
import threading, time def run1():
print("grab the first part data")
lock.acquire()
global num
num += 1
lock.release()
return num def run2():
print("grab the second part data")
lock.acquire()
global num2
num2 += 1
lock.release()
return num2 def run3():
lock.acquire()
res = run1()
print('--------between run1 and run2-----')
res2 = run2()
lock.release()
print(res, res2) if __name__ == '__main__': num, num2 = 0, 0
lock = threading.RLock()#递归锁
for i in range(5):
t = threading.Thread(target=run3)
t.start() while threading.active_count() != 1:
print(threading.active_count())
else:
print('----all threads done---')
print(num, num2)
信号量.py
#!/usr/bin/env python
#Author is wspikh
# -*- coding: encoding -*-
import threading, time #互斥锁同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去。
def run(n):
semaphore.acquire()#加锁
time.sleep(1)
print("run the thread: %s\n" % n)
semaphore.release()#释放锁 if __name__ == '__main__': num = 0 #全局变量
semaphore = threading.BoundedSemaphore(5) #最多允许5个线程同时运行
for i in range(20):
t = threading.Thread(target=run, args=(i,))
t.start() while threading.active_count() != 1:
pass # print threading.active_count()
else:
print('----all threads done---')
4、将线程变为守护进程
#!/usr/bin/env python
#Author is wspikh
# -*- coding: encoding -*-
import threading
import time
def run(n):
print("task",n)
time.sleep(2)
print("task done",n)
#开始时间
start_time = time.time() t_objs = []#存线程实例
for i in range(50):
t = threading.Thread(target=run,args=("t%s" %i,))
t.setDaemon(True)#设置守护线程(非守护线程退出,也就意味者退出,不关心守护线程)
t.start()
t_objs.append(t)#为了不阻塞后面线程的启动,不在这里join,先放到一个空列表里 #for t in t_objs: #循环线程实例列表,等待所有线程执行完毕
# t.join() #其实就是wait的意思 #结束时间,计算花费了多长时间
print("------All threads has finished!------",":",threading.current_thread(),":",threading.active_count())
print("cost:",time.time()-start_time)
5、Event事件
event_t.py
#!/usr/bin/env python
#Author is wspikh
# -*- coding: encoding -*-
import time
import threading #设置事件
event = threading.Event() #定义灯
def lighter():
count = 0
event.set() #先设置为绿灯
while True:
if count > 5 and count < 11: #改成红灯
event.clear() #清除标志位
print("\033[41;1mred light is on .....\033[0m")
elif count > 11:
event.set() #变绿灯
count = 0
else:
print("\033[42;1m green light is on .....\033[0m")
time.sleep(1)
count += 1
#定义车
def car(name):
while True:
if event.is_set():#判断标志位是否设置(标志位代表绿灯)
print("[%s] is running...." %name)
time.sleep(1)
else:
print("[%s] sees red light ,waiting" %name)
event.wait()
print("\033[34;1m green light is on,start gong....\033[0m")
#设置一个线程
light = threading.Thread(target=lighter,)
light.start() #设置一个线程
car1 = threading.Thread(target=car,args=("Tesla",))
car1.start()
6、Queue队列
Queue是python标准库中的线程安全的队列(FIFO)实现,提供了一个适用于多线程编程的先进先出的数据结构,即队列,用来在生产者和消费者线程之间的信息传递
基本FIFO队列
class Queue.Queue(maxsize=0)
FIFO即First in First Out,先进先出。Queue提供了一个基本的FIFO容器,使用方法很简单,maxsize是个整数,指明了队列中能存放的数据个数的上限。一旦达到上限,插入会导致阻塞,直到队列中的数据被消费掉。如果maxsize小于或者等于0,队列大小没有限制。
举个栗子:
import Queue
q = Queue.Queue()
for i in range(5):
q.put(i)
while not q.empty():
print q.get()
输出:
0
1
2
3
4
LIFO队列
class Queue.LifoQueue(maxsize=0)
LIFO即Last in First Out,后进先出。与栈的类似,使用也很简单,maxsize用法同上
再举个栗子:
import Queue
q = Queue.LifoQueue()
for i in range(5):
q.put(i)
while not q.empty():
print q.get()
输出:
4
3
2
1
0
可以看到仅仅是将Queue.Quenu类
替换为Queue.LifiQueue类
优先级队列
class Queue.PriorityQueue(maxsize=0)
构造一个优先队列。maxsize用法同上。
import Queue
import threading
class Job(object):
def __init__(self, priority, description):
self.priority = priority
self.description = description
print 'Job:',description
return
def __cmp__(self, other):
return cmp(self.priority, other.priority)
q = Queue.PriorityQueue()
q.put(Job(3, 'level 3 job'))
q.put(Job(10, 'level 10 job'))
q.put(Job(1, 'level 1 job'))
def process_job(q):
while True:
next_job = q.get()
print 'for:', next_job.description
q.task_done()
workers = [threading.Thread(target=process_job, args=(q,)),
threading.Thread(target=process_job, args=(q,))
]
for w in workers:
w.setDaemon(True)
w.start()
q.join()
结果
Job: level 3 job
Job: level 10 job
Job: level 1 job
for: level 1 job
for: level 3 job
for: job: level 10 job
一些常用方法
task_done()
意味着之前入队的一个任务已经完成。由队列的消费者线程调用。每一个get()调用得到一个任务,接下来的task_done()调用告诉队列该任务已经处理完毕。
如果当前一个join()正在阻塞,它将在队列中的所有任务都处理完时恢复执行(即每一个由put()调用入队的任务都有一个对应的task_done()调用)。
join()
阻塞调用线程,直到队列中的所有任务被处理掉。
只要有数据被加入队列,未完成的任务数就会增加。当消费者线程调用task_done()(意味着有消费者取得任务并完成任务),未完成的任务数就会减少。当未完成的任务数降到0,join()解除阻塞。
put(item[, block[, timeout]])
将item放入队列中。
- 如果可选的参数block为True且timeout为空对象(默认的情况,阻塞调用,无超时)。
- 如果timeout是个正整数,阻塞调用进程最多timeout秒,如果一直无空空间可用,抛出Full异常(带超时的阻塞调用)。
- 如果block为False,如果有空闲空间可用将数据放入队列,否则立即抛出Full异常
其非阻塞版本为put_nowait
等同于put(item, False)
get([block[, timeout]])
从队列中移除并返回一个数据。block跟timeout参数同put
方法
其非阻塞方法为`get_nowait()`相当与get(False)
empty()
如果队列为空,返回True,反之返回False
7、生产者消费者模型
#!/usr/bin/env python
#Author is wspikh
# -*- coding: encoding -*-
import threading
import queue #生产者
def producer():
for i in range(10):
q.put("骨头 %s" % i )
print("开始等待所有的骨头被取走...")
q.join() #wait
print("所有的骨头被取完了...") #消费者
def consumer(n):
while q.qsize() >0:
print("%s 取到" %n , q.get())
q.task_done() #告知这个任务执行完了 q = queue.Queue()
p = threading.Thread(target=producer,)
p.start()
c1 = consumer("陈荣华") #实例化消费者
三、进程
定义: 进程是正在运行的程序
特点: 结构特征、动态性、并发性、独立性、异步性
#Author is wspikh
# -*- coding: encoding -*-
import multiprocessing
import time
import threading
import os
def thread_run():
s = time.time()
print('Main process:%s' %os.getppid())
print('Process:%s' %os.getpid())
print("耗时: %s" %(time.time()-s))
def run(name):
time.sleep(2)
print('hello', name)
t = threading.Thread(target=thread_run(),)
t.start()
if __name__ == '__main__':
for i in range(10):
p = multiprocessing.Process(target=run, args=('bob %s' %i,))
p.start()
p.join()
#Author is wspikh
# -*- coding: encoding -*-
#此模块中已经包括了Queue,Process方法
from multiprocessing import Queue,Process
def f(qq):
qq.put([42,None,'hello'])
if __name__ == '__main__':
q = Queue()
p = Process(target=f,args=(q,))
p.start()
print(q.get())
p.join()
#Author is wspikh
# -*- coding: encoding -*-
#此模块中已经包括了Precess、Pipe方法
from multiprocessing import Process, Pipe
#发送2次
conn.send([42, None, 'hello from child'])
conn.send([42, None, 'hello from child2'])
#接收
#管道两头分别是parent_conn,child_conn,两头可互换
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
#接受2次
print(parent_conn.recv()) # prints "[42, None, 'hello']"
print(parent_conn.recv()) # prints "[42, None, 'hello']"
#发送
parent_conn.send("张洋可好") # prints "[42, None, 'hello']"
p.join()
# -*- coding: encoding -*-
from multiprocessing import Process, Manager
import os
def f(d, l):
d[os.getpid()]=os.getpid()
l.append(os.getpid())
print(l)
if __name__ == '__main__':
with Manager() as manager: #等同于 manager=Manager()
d = manager.dict() #创建一个字典,可以在多进程间传递和共享
l = manager.list(range(5))#创建一个列表,可以在多进程间传递和共享
#创建一个空列表
p_list = []
for i in range(10):
#实例化一个子进程
p = Process(target=f, args=(d, l))
p.start()
#把对象放到空列表
p_list.append(p)
for res in p_list:
res.join() #wait
print(d)
#Author is wspikh
# -*- coding: encoding -*-
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
【进程池中有两个方法:apply 和 apply_async】
#Author is wspikh
# -*- coding: encoding -*-
from multiprocessing import Process,Pool
import time
import os
def Foo(i):
time.sleep(2)
print("in process",os.getpid())
return i + 1
def Bar(arg):
print('--->exec done:',arg,os.getpid())
if __name__ == '__main__':
pool = Pool(processes=5) #允许进程池同时放入5个进程
print("主进程",os.getpid())
for i in range(10):
#实例化进程池,callback回调,执行完Foo函数,再执行Bar函数
pool.apply_async(func=Foo,args=(i,),callback=Bar)
#一定要先关闭掉进程池再关闭
pool.close()
pool.join()
print("End")
四、线程和进程的区别、联系
1、启动一个线程快
2、线程共享内存空间,进程的内存是独立的。
3、同一个进程的线程之间可以互相交流