Python第十五天 datetime模块 time模块 thread模块 threading模块 Queue队列模块 multiprocessing模块 paramiko模块 fabric模块

时间:2022-02-16 07:38:10

Python第十五天  datetime模块 time模块   thread模块  threading模块  Queue队列模块  multiprocessing模块  paramiko模块  fabric模块

目录

Pycharm使用技巧(转载)

Python第一天  安装  shell  文件

Python第二天  变量  运算符与表达式  input()与raw_input()区别  字符编码  python转义符  字符串格式化

Python第三天 序列  5种数据类型  数值  字符串  列表  元组  字典

Python第四天   流程控制   if else条件判断   for循环 while循环

Python第五天   文件访问    for循环访问文件    while循环访问文件   字符串的startswith函数和split函数

Python第六天   类型转换

Python第七天   函数  函数参数   函数变量   函数返回值  多类型传值    冗余参数   函数递归调用   匿名函数   内置函数   列表表达式/列表重写

Python第八天  模块   包   全局变量和内置变量__name__    Python path

Python第九天  面向对象  类定义   类的属性    类的方法    内部类   垃圾回收机制   类的继承 装饰器

Python第十天   print >> f,和fd.write()的区别    stdout的buffer  标准输入 标准输出  标准错误   重定向 输出流和输入流

Python第十一天    异常处理  glob模块和shlex模块    打开外部程序和subprocess模块  subprocess类  Pipe管道  operator模块   sorted函数   生成器  walk模块   hashlib模块

Python第十二天     收集主机信息     正则表达式  无名分组   有名分组

Python第十三天   django 1.6   导入模板   定义数据模型   访问数据库   GET和POST方法    SimpleCMDB项目   urllib模块   urllib2模块  httplib模块  django和web服务器整合  wsgi模块   gunicorn模块

Python第十四天 序列化  pickle模块  cPickle模块  JSON模块  API的两种格式

Python第十五天  datetime模块 time模块   thread模块  threading模块  Queue队列模块  multiprocessing模块  paramiko模块  fabric模块

datetime模块    time模块

子类之间的对应关系
object
timedelta
tzinfo
time
date
datetime

now = datetime.datetime.utcnow() # 获取当前utc时间
now = datetime.datetime.now() # 获取当前时间
delta = datetime.timedelta(minutes=5) # 获取前5分钟时间
fiveminago = now – delta # 时间差
now > fiveminago # 时间比较
GMT_FORMAT = '%b %d %H:%M:%S GMT'
now.strftime(GMT_FORMAT) # 格式化时间
now.strftime('%Y-%m-%d.%H:%M:%S') # 格式化时间
int(time.time()) #返回时间戳
time.ctime() #相当于datetime.datetime.now()

datetime模块
https://www.cnblogs.com/cindy-cindy/p/6720196.html

# 解释日志时间

import datetime
import time
def parseLogTime(line):
now = datetime.datetime.now()
month,day,time = line.split()[:3]
hour,minute,second = [int(i) for i in time.split(':')]
logtime = datetime.datetime(now.year,MONTH[month],int(day),hour,minute,second)
return logtime

获取今天零点和昨天零点时间

# 获取今天零点和昨天零点时间
import datetime
import time

now_time = int(time.time())   # 获取当前时间的时间戳
day_time = now_time - now_time % 86400 + time.timezone  # 今天零点时间戳
today_datetimeformat=datetime.datetime.fromtimestamp(day_time)  # 转换为datetime格式

delta = datetime.timedelta(days=1)  # 一天的时间差
yesterday_datetimeformat = today_datetimeformat - delta   # 昨天零点时间datetime格式

yesterday_uxtimeformat = int(time.mktime(yesterday_datetimeformat.timetuple()))
torday_uxtimeformat = int(time.mktime(today_datetimeformat.timetuple()))
print yesterday_uxtimeformat
print torday_uxtimeformat
定义
class datetime(date)
| datetime(year, month, day[, hour[, minute[, second[, microsecond[,tzinfo]]]]])
|
| The year, month and day arguments are required. tzinfo may be None, or an
| instance of a tzinfo subclass. The remaining arguments may be ints or longs.

time模块

python中的time模块
https://www.cnblogs.com/renpingsheng/p/6965044.html

datetime模块

datetime模块定义了下面这几个类:
datetime.date:表示日期的类。常用的属性有year, month, day;
datetime.time:表示时间的类。常用的属性有hour, minute, second, microsecond;
datetime.datetime:表示日期时间。
datetime.timedelta:表示时间间隔,即两个时间点之间的长度。
datetime.tzinfo:与时区有关的相关信息。

时间格式相互转换
时间对象,时间字符串,时间戳

获取当前时间
import datetime
now_time=datetime.datetime.now()
print(now_time)
print(type(now_time))

字符串、datetime互转
import datetime
string = '2017-01-02 11:12:12'
_d_time = datetime.datetime.strptime(string, '%Y-%m-%d %H:%M:%S')
print (_d_time)
print(type(_d_time))

dateime转字符串
import datetime
_time=datetime.datetime.strftime(now_time,'%Y-%m-%d %H:%M:%S')
print(_time)
print(type(_time))

时间戳、datetime互转
import time
_a=int(time.time())
print (_a)
_n_time=datetime.datetime.fromtimestamp(_a)
print (_n_time)
print(type(_n_time))

datetime转换为unix时间戳
dtime = datetime.datetime.now()
un_time = int(time.mktime(dtime.timetuple()))
print un_time


多线程

thread模块(python3不存在thread模块,thread模块在python3中命名为_thread)
threading模块

thread模块是低级模块
threading模块是高级模块,对thread模块的封装
生产中要用threading模块而不用thread模块

GIL:Python无法高效的实现多线程的原因 《Python Linux系统管理与自动化运维》

Python 默认的解释器,由于全局解释器锁GIL的存在,确实在任意时刻都只有一个线程在执行Python 代码,致使多线程不能充分利用机器多核的特性,GIL的好处是防止死锁、争用条件、高复杂性问题
多进程代替多线程有效避开GIL,每个进程都有自己的python解释器实例,不受GIL限制
但是,我们的程序也不是无时无刻不在计算的,我们的程序需要等待用户输入、等待文件读写以及网络收发数据,这些都是比较费时的操作。
使用Python 多线程,计算机会将这些等待操作放到后台去处理,从而释放出宝贵的计算资源,继续进行计算。
也就是说,如果读者的程序是CPU 密集型的,使用Python 的多线程确实无法提升程序的效率,如果读者的程序是IO 密集型的,则可以使用Python 的多线程提高程序的整体效率。

线程的特点:
- 线程的生命周期
- 开始
- 运行
- 结束

线程的退出:
- 进程执行完成
- 线程的退出方法
- python系统退出

我们通常把当前进程叫做主线程或者主进程。
函数是通过thread.start_new_thread来调用的,说明它是在线程中运行的。

import thread,threading
start_new_thread(func, args)
func:函数名
args:元组
allocate_lock()
exit()

#!/usr/bin/env python
#encoding:utf8
import thread
import time
def func(name,i):
    for n in xrange(i):
        print name,n
        #time.sleep(1)
thread.start_new_thread(func, ('声音',3))
thread.start_new_thread(func, ('画面',3))
time.sleep(1)

LockType对象方法
acquire()
locked()
release()  //释放锁,使用前线程必须已获得锁定,否则将抛出异常

thread锁使用
lock = thread.allocate_lock():生成全局锁对象
lock.acquire():加锁
线程使用锁对象
lock.release():线程调度释放锁
查看locked()状态

示例1

#!/usr/bin/env python
#encoding:utf8

import thread
import time

def func(name, i, l):
    for n in xrange(i):
        print name, n
        time.sleep(1)
    l.release()

lock = thread.allocate_lock()
lock.acquire()
thread.start_new_thread(func, ('声音', 3, lock))
thread.start_new_thread(func, ('画面', 3, lock))

while lock.locked():
    pass
print lock.locked()
print "Exit main process"

示例2

#!/usr/bin/env python

import thread
import time

def world():
    for i in range(5):
        if w_lock.acquire():
            print 'world', time.ctime()
            h_lock.release()

h_lock = thread.allocate_lock()
w_lock = thread.allocate_lock()
thread.start_new_thread(world, ())
w_lock.acquire()
for i in range(5):
    if h_lock.acquire():
        print 'hello',
        w_lock.release()
while h_lock.locked():
    pass

示例3

#!/usr/bin/env python

import thread
import time

def hello():
    for i in xrange(5):
        h_lock.acquire()
        print 'hello',
        w_lock.release()

def world():
    for i in xrange(5):
        w_lock.acquire()
        print 'world', time.ctime()
        h_lock.release()
    lock.release()

lock = thread.allocate_lock()  分配锁
lock.acquire() 获得锁
h_lock = thread.allocate_lock()
w_lock = thread.allocate_lock()
w_lock.acquire()
thread.start_new_thread(hello, ())
thread.start_new_thread(world, ())

while lock.locked():  是否获得锁
    pass

threading模块

threading不需要进程来控制线程,主进程会等待线程执行完毕才退出
threading.Thread:类
成员方法:

- isAlive() //查线程是否在运行中
- start() //启动线程 非阻塞
- run() //可以重写
- join() //该方法会阻塞调用,直到线程中止
- getName() //获取线程的名称
- setName() //设置线程的名称
- isDaemon() //判断线程是否守护线程
- setDaemon() //设置线程为守护线程

1、使用threading.Thread生成线程对象来开启多线程
#给线程/函数传递参数

import time
import threading

def t1(name, x):
    for i in xrange(x):
        print i, name
        time.sleep(1)

th1 = threading.Thread(target=t1, args=('声音', 3))
th2 = threading.Thread(target=t1, args=('画面', 3))
target:函数名
args:函数的参数

th1.setDaemon(True)   //随着主线程的退出而退出,一定要写在th1.start()前面

th1.start()     //运行一个线程
#th1.run()     //一直运行这个线程,直到运行完才会运行下一个线程
#th1.join()    //等待线程终止
th2.start()

2、继承threading.Thread 的方式编写多线程程序

通过继承threading.Thread 类进行多线程编程,只需要在子类中实现run 方法,并在run 方法中实现该线程的业务逻辑即可

from __future__ import print_function
import threading

class MyThread(threading.Thread ):
    def __init__(self, count, name):
        super(MyThread, self).__init__()
        self.count = count
        self.name = name

    def run(self):
        while self.count>0:
        print ("hello", self.name)
        self.count-= 1

def main():
    usernames = [ 'Bob','Jack','Pony','Jone','Mike']
    for i in range(5):
        thread = MyThread(50,usernames[i])
        thread.start()

if __name__ =='__main__':
    main()
    

线程安全:用锁来控制并发访问

Lock工厂函数
lock = threading.Lock()
lock. acquire()   //获取锁
lock.release()     //释放锁
lock.locked()     //查看锁状态

#加锁和释放锁
 import threading
    lock = threading.Lock()
    try:
        lock.acquire()
    # do something
    finally:
        lock.release()

#改为用上下文管理器
    import threading
    lock = threading.Lock()
    with lock:
       # do something
       pass

启动线程的两个方法
第一种方法:定义一个函数,threading.Thread实例化对象来启动
第二种方法:编写一个类并继承threading.Thread并重写run方法


线程安全队列
线程安全队列是线程间最常用的交换数据的形式,线程间通信模块。Queue是提供队列操作的模块

生产者消费者模型
生产者 和消费者之间加一个缓冲区
生产者消费者之间存在并发访问问题,即多个消费者可能同时从缓冲区中获取商品,
为了解决并发问题,使用python标准库的queue,queue是线程安全的队列实现FIFO
提供多线程编程的先进先出数据结构,非常适合生产者消费者之间数据传递

三种队列:
FIFO:Queue.Queue(maxsize=0):一个先进先出( FIFO )的队列,最先加入队列的元素最先取出
LIFO:Queue.LifoQueue(maxsize=0):一个后进先出( LIFO )的队列,最后加入队列的元素最先取出
Priority:Queue.PriorityQueue(maxsize=0):优先级队列,队列中的元素根据优先级排序

Queue一些方法
Queue.empty() //如果队列为空,返回True,反之False
Queue.full() //如果队列满了,返回True,反之False
Queue.join() //阻塞等待,直到所有消费者对每一个元素都调用了task_done
Queue.task_done() //和join一起工作,提示先前取出的元素已经完成处理
Queue.get([block[, timeout]]) //读队列,timeout等待时间 ,block和timeout意思跟put方法一样
Queue.get_nowait() //读队列,当队列中没有数据的时候会直接抛出Empty异常,而不会等待
Queue.put_nowait() //写队列,非阻塞地向队列添加元素
Queue.put(item, [block[, timeout]]) //写队列,timeout等待时间 ,任何数据结构数据都可以加到队列,block=true,timeout=none默认,当队列满了会一直阻塞并等待
Queue.queue.clear() 清空队列

q = Queue.Queue()
class Queue
| Create a queue object with a given maximum size.
|
| If maxsize is <= 0, the queue size is infinite.
|
| Methods defined here:
|
| __init__(self, maxsize=0)
|

#!/usr/bin/env python

import threading
import random
import Queue
import time

def producer(name, queue):
    for i in xrange(10):
        num = random.randint(1,10)
        th_name = name + '-' + threading.currentThread().getName()
        print "%s: %s ---> %s" % (time.ctime(), th_name, num)
        queue.put(num)
        time.sleep(1)

def odd(name, queue):
    th_name = name + '-' + threading.currentThread().getName()
    while True:
        try:
            val_odd = queue.get(1,5)  #阻塞,阻塞5秒超时
            if val_odd % 2 != 0:
                print "%s: %s ---> %s" % (time.ctime(), th_name, val_odd)
                time.sleep(1)
            else:
                queue.put(val_odd)
                time.sleep(1)
        except:
            print "%s: %s finished" % (time.ctime(), th_name)
            break

def even(name, queue):
    th_name = name + '-' + threading.currentThread().getName()
    while True:
        try:
            val_even = queue.get(1,5)
            if val_even % 2 == 0:
                print "%s: %s ---> %s" % (time.ctime(), th_name, val_even)
                time.sleep(1)
            else:
                queue.put(val_even)
                time.sleep(1)
        except:
            print "%s: %s finished" % (time.ctime(), th_name)
            break

def main():
    q = Queue.Queue(10)
    t_pro = threading.Thread(target=producer, args=('pro',q))
    t_odd = threading.Thread(target=odd, args=('odd',q))
    t_even = threading.Thread(target=even, args=('even',q))
    t_pro.start()
    t_odd.start()
    t_even.start()

if __name__ == '__main__':
    main()

multiprocessing模块

thread模块和threading模块都不是真正意义的多线程,受限于GIL只能用到一个核心
multiprocessing使用多进程,可以利用多个核心,但是因为是多进程开销也比多线程大

multiprocessing.Process调用方法跟threading.Thread调用方法和参数基本一样

from multiprocessing import Process
import time
import os

def f(name):
print 'hello', name, os.getpid(), os.getppid()
time.sleep(1)

if __name__ == '__main__':
for i in range(10):
p = Process(target=f, args=(i,))
p.start()
#p.join() #wait

进程池Pool

multiprocessing.Pool很方便的同时自动处理几百或者上千个并行操作,脚本的复杂性也大大降低。
pool = multiprocessing.Pool(processes=3) // 设置最大进程数为3,processes的默认值等于系统当前的最大核心数,假设当前是4核机器,那么进程池默认会初始化四个进程
result = pool.apply_async(func=f, args=(i,)) //向进程池提交目标请求,非阻塞的,但result.get()方法是阻塞的。
pool.close() //会等待池中的所有worker进程执行结束再关闭pool
pool. terminate() //直接关闭pool
pool.join() //等待进程池中的所有worker进程执行完毕,阻塞主进程。但必须使用在pool.close()或者pool.terminate()之后

  pool.close()
  pool.join()  #阻塞主进程,不然主进程退出了子进程还在运行变成孤儿进程

如果进程池里所有进程都再处理请求,这时候有新进程进来必须等待直到有空闲进程可以处理这个新请求

#!/usr/bin/env python

import multiprocessing
from subprocess import Popen, PIPE

def run(id):
    p = Popen('vim', stdout=PIPE, stderr=PIPE)
    p.communicate()  #不用 stdout和 stderr 接收 表示等待命令执行完
    print "Task: %s" %id

pool = multiprocessing.Pool()
for i in xrange(5):
    pool.apply_async(func=run, args=(i,))
print "Wait..."
pool.close()
pool.join()
print 'Done'
#!/usr/bin/env python

import multiprocessing
import os
import time

def run_task(id):
    print "Run task id: %s pid(%s) ppid(%s)" % (id, os.getpid(), os.getppid())
    start = int(time.time())
    time.sleep(3)
    end = int(time.time())
    print "Task %s run %0.2f" % (id, (end - start))

if __name__ == '__main__':
    print "Parent process %s" %os.getpid()
    pool = multiprocessing.Pool(processes=2)
    for i in xrange(5):
        pool.apply_async(run_task, args=(i,))
    #print "Wait..."
    pool.close()
    pool.join()
    print "Done"

paramiko模块

paramiko 是一个Python 的库,该库支持Python 2.6 +和Python 3.3 +版本,实现了SSHv2 协议(底层使用cryptography )

cryptography为python开发组提供cryptographic recipes加密套件和 primitives 原语的一个python包
cryptographic standard library
It supports Python 2.7, Python 3.4+, and PyPy 5.3+

pip install cryptography

只能上传文件,不能上传文件夹,需要结合os.walk,上传目录里所有文件

SSHClient
SSHClient 类是对SSH 会话的封装,该类封装了传输(transport) 、通道(channel)及SFTPClient 建立的方法(open_sftp), 通常用于执行远程命令

SSHClient 类常用的几个方法:
1 ) connect : connect 方法实现远程服务器连接与认证,对于该方法, 只有hostname 是必传参数。
connect(self , hostname , port=22, username=None , password=None ,pkey=None , key_filename=None , timeout=None ,allow_agent=True , look_for_keys=True , compress=False)
2 ) set_missing_host_key_policy : 设置远程服务器没有在know_hosts 文件中记录时的应对策略。目前支持三种策略,分别是AutoAddPolicy 、RejectPolicy ( 默认策略)与WarningPolicy
分别表示自动添加服务器到know_hosts 文件、拒绝本次连接、警告并将服务器添加到know hosts 文件中。
3 ) exec_command :在远程服务器执行Linux 命令的方法。
4 ) open_sftp:在当前ssh 会话的基础上创建一个sftp 会话。该方法会返回一个SFTPClient 对象。

SFTPClient 类常用的几个方法:
put :上传本地文件到远程服务器;
get :从远程服务器下载文件到本地;
mkdir :在远程服务器上创建目录;
remove :删除远程服务器中的文件;
rmdir :删除远程服务器中的目录;
rename :重命名远程服务器中的文件或目录;
stat : 获取远程服务器中文件的详细信息;
listdir :列出远程服务器中指定目录下的内容。
这里仅仅介绍了paramiko 中SSHClient 与SFTPClient 类的常用方法,完整的API 可以参考官方文档

需要安装libffi-devel开发库
yum install -y libffi*

error: invalid command ‘bdist_wheel’
http://www.cnblogs.com/BugQiang/archive/2015/08/22/4732991.html

paramiko模块遵循SSH2协议,支持加密和认证的方式,进行远程服务器的连接。
由于使用的是python这样的能够跨平台运行的语言,所以所有python支持的平台,如Linux, Solaris, BSD, MacOS X, Windows等,paramiko都可以支持

批量执行命令
安装paramilo模块
yum install -y python-paramiko1.10.noarch
或者:
pip install paramiko==2.2.1

例子
import paramiko
client = paramiko.SSHClient()
client.load_system_host_keys() //~/.ssh/know_hosts #加载本机known_hosts文件
或:
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(hostname='127.0.0.1',username='root',password=‘XXX')
stdin, stdout, stderr = client.exec_command('date')
stdout.read()
client.close()

基于用户名和密码的 sshclient 方式登录
# 建立一个sshclient对象
ssh = paramiko.SSHClient()
# 允许将信任的主机自动加入到known_hosts列表,此方法必须放在connect方法的前面
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 调用connect方法连接服务器
ssh.connect(hostname='192.168.2.129', port=22, username='super', password='super',timeout=5)
# 执行命令
stdin, stdout, stderr = ssh.exec_command('df -hl')
# 结果放到stdout中,读出stdout中的结果
print(stdout.read().decode())
# 关闭连接
ssh.close()

基于公钥密钥的 SSHClient 方式登录
# 指定本地的RSA私钥文件,如果建立密钥对时设置的有密码,password为设定的密码,如无不用指定password参数
pkey = paramiko.RSAKey.from_private_key_file('/home/super/.ssh/id_rsa', password='12345')
# 建立连接
ssh = paramiko.SSHClient()
ssh.connect(hostname='192.168.2.129',port=22,username='super',pkey=pkey,timeout=5)
# 执行命令
stdin, stdout, stderr = ssh.exec_command('df -hl')
# 结果放到stdout中,读出stdout中的结果
print(stdout.read().decode())
# 关闭连接
ssh.close()

基于用户名和密码的 transport 方式传文件
# 实例化一个transport对象
trans = paramiko.Transport(('192.168.2.129', 22))
# 建立连接
trans.connect(username='super', password='super')
# 实例化一个 sftp对象,指定连接的通道
sftp = paramiko.SFTPClient.from_transport(trans)
# 发送文件
sftp.put(localpath='/tmp/11.txt', remotepath='/tmp/22.txt')
# 下载文件
sftp.get(remotepath='/tmp/22.txt', localpath='/tmp/11.txt')
trans.close()

基于密钥的 Transport 方式传文件
# 指定本地的RSA私钥文件,如果建立密钥对时设置的有密码,password为设定的密码,如无不用指定password参数
pkey = paramiko.RSAKey.from_private_key_file('/home/super/.ssh/id_rsa', password='12345')
# 建立连接
trans = paramiko.Transport(('192.168.2.129', 22))
trans.connect(username='super', pkey=pkey)
# 实例化一个 sftp对象,指定连接的通道
sftp = paramiko.SFTPClient.from_transport(trans)
# 发送文件
sftp.put(localpath='/tmp/11.txt', remotepath='/tmp/22.txt')
# 下载文件
sftp.get(remotepath='/tmp/22.txt', localpath='/tmp/11.txt')
trans.close()

问题汇总

https://cloud.tencent.com/community/article/945339
Error reading SSH protocol banner连接错误

解决办法:
重新下载paramiko插件源码,解压后,编辑安装目录下的transport.py文件:
vim build/lib/paramiko/transport.py
搜索 self.banner_timeout 关键词,并将其参数改大即可,比如改为300s:
self.banner_timeout = 300
最后,重装paramiko即可。

paramiko远程执行后台脚本“阻塞”问题
解决办法
将远程脚本的标准输出stdout重定向到错误输出stderr即可
现在执行,就能立即得到结果了。其实原因很简单,因为bash /tmp/test.sh & 虽然是后台执行,但是依然会产生标准输出,一旦产生标准输出,paramiko就会认为命令还未执行完成,且stdout的buffer大于stderr,因此产生等待问题。
这里只要将脚本执行的标准输出重定向到错误输出(1>&2),然后paramiko就可以使用stderr快速读取远程打屏信息了。

#!/usr/bin/env python

import paramiko
import threading
import sys

def ssh_conn(ip, cmd):
    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    pkey_file = '/root/.ssh/id_rsa'
    key = paramiko.RSAKey.from_private_key_file(pkey_file)
    try:
        ssh.connect(hostname=ip, username='root', pkey=key, timeout=5)
    except:
        print "%s: Timeout or not permission" % ip
        return 1
    stdin, stdout, stderr = ssh.exec_command(cmd)
    stdout = stdout.read()[:-1]  #第0个字符到最后一个字符但是不包含最后一个字符
    stderr = stderr.read()[:-1]
    if stdout:
        print "%s:\t %s" % (ip, stdout)
        ssh.close()
    else:
        print "%s:\t %s" % (ip, stderr)
        ssh.close()

if __name__ == '__main__':
    paramiko.util.log_to_file('/tmp/paramiko.log')
    ips = ['192.168.20.'+str(i) for i in xrange(1, 100)]
    try:
        cmd = sys.argv[1]
    except IndexError:
        print "%s follow a command" % __file__
        sys.exit(1)
    for ip in ips:
        t = threading.Thread(target=ssh_conn, args=(ip,cmd))
        t.start()
#!/usr/bin/env python
#进程池版本
import paramiko
import multiprocessing
import sys

def ssh_conn(ip, cmd):
    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    pkey_file = '/root/.ssh/id_rsa'
    key = paramiko.RSAKey.from_private_key_file(pkey_file)
    try:
        ssh.connect(hostname=ip, username='root', pkey=key, timeout=5)
    except:
        print "%s: Timeout or not permission" % ip
        return 1
    stdin, stdout, stderr = ssh.exec_command(cmd)
    stdout = stdout.read()[:-1]
    stderr = stderr.read()[:-1]
    if stdout:
        print "%s:\t %s" % (ip, stdout)
        ssh.close()
    else:
        print "%s:\t %s" % (ip, stderr)
        ssh.close()

if __name__ == '__main__':
    paramiko.util.log_to_file('/tmp/paramiko.log')
    ips = ['192.168.20.'+str(i) for i in xrange(1, 100)]
    try:
        cmd = sys.argv[1]
    except IndexError:
        print "%s follow a command" % __file__
        sys.exit(1)
    pool = multiprocessing.Pool(processes=10)
    for ip in ips:
        pool.apply_async(func=ssh_conn, args=(ip, cmd))
    pool.close()
    pool.join()
#!/usr/bin/env python
#可传入参数改进版

import paramiko
import multiprocessing
import sys
from optparse import OptionParser
import urllib, urllib2
import json

DATA_BACK = '/var/tmp/data.json'

def opt():
    parser = OptionParser("Usage: %prog -a|-g command")
    parser.add_option('-a',
                      dest='addr',
                      action='store',
                      default='True',
                      help='ip or iprange EX: 192.168.1,192.168.1.3 or 192.168.1.1-192.168.1.100')
    parser.add_option('-g',
                      dest='group',
                      action='store',
                      help='groupname')
    options, args = parser.parse_args()
    return options, args

def ssh_conn(ip, cmd):
    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

    try:
        ssh.connect(hostname=ip, port=9022, username='steven', password='xx',timeout=5)
    except:
        print "%s: Timeout or not permission" % ip
        return 1
    stdin, stdout, stderr = ssh.exec_command(cmd)
    stdout = stdout.read()[:-1]
    stderr = stderr.read()[:-1]
    if stdout:
        print "%s:\t %s" % (ip, stdout)
        ssh.close()
    else:
        print "%s:\t %s" % (ip, stderr)
        ssh.close()

def parseOpt(option):
    if ',' in option:
        ips = option.split(',')
        return ips
    elif '-' in option:
        ip_start, ip_end = option.split('-')
        ip_net = '.'.join(ip_start.split('.')[:-1])
        start = int(ip_start.split('.')[-1])
        end = int(ip_end.split('.')[-1]) + 1
        ips = [ip_net+'.'+str(i) for i in range(start, end)]
        return ips
    elif ',' not in option or '-' not in option:
        ips = [option]
        return ips
    else:
        print "%s -h" % __file__

def getData():
    url = 'http://192.168.1.5:8000/hostinfo/getjson/'
    try:
        req = urllib2.urlopen(url)
        data = json.loads(req.read())
        with open(DATA_BACK, 'wb') as fd:
            json.dump(data, fd)
    except:
        with open(DATA_BACK) as fd:
            data = json.load(fd)
    return data

def parseData(data):
    dic_host = {}
    for hg in data:
        groupname = hg['groupname']
        dic_host[groupname] = []
        for h in hg['members']:
            dic_host[groupname] += [h['ip']]  #列表相加 得到一个新列表
    return dic_host

if __name__ == '__main__':
    paramiko.util.log_to_file('/tmp/paramiko.log')
    options, args = opt()
    try:
        cmd = args[0]
    except IndexError:
        print "%s follow a command" % __file__
        sys.exit(1)
    if options.addr:
        ips = parseOpt(options.addr)
    elif options.group:
        groupname = options.group
        data = getData()
        dic = parseData(data)
        if groupname in dic:
            ips = dic[groupname]
        else:
            print "%s is not exists in SimpleCMDB" % groupname
            sys.exit()
    else:
        print "%s -h" %  __file__
        sys.exit(1)
    pool = multiprocessing.Pool(processes=10)
    for ip in ips:
        pool.apply_async(func=ssh_conn, args=(ip, cmd))
    pool.close()
    pool.join()  #阻塞一下主进程,不然主进程退出了子进程还在运行变成孤儿进程

fabric模块

Python第十五天  datetime模块 time模块   thread模块  threading模块  Queue队列模块  multiprocessing模块  paramiko模块  fabric模块

Fabric 的作者也是paramiko 的作者

Fabric 依赖paramiko 、pycrypto
fabric是基于paramiko模块封装开发的。paramiko更底层
安装:依赖pycrypto,paramiko
要先卸载系统自带的python-pycrypto与python-paramiko
pycrypto最好安装2.3版本,如果安装2.6.1会有gmp版本低的警告

Fabric 比较特殊,它既是一个Python 库,也是一个命令行工具
它的命令行工具不是Fabric , 而是fab 。我们可以通过下面的语句获得命令行工具的简单说明以及验证Fabric 是否安装正确:
$ fab --help

安装:
yum install -y python-devel
pip install pycrypto=="2.3"
pip install paramiko==1.12.4
pip install fabric==1.8.3

pip list
argparse (1.2.1)
asn1crypto (0.22.0)
bcrypt (3.1.3)
cffi (1.10.0)
cryptography (2.0.1)
distribute (0.6.10)
Django (1.6.5)
ecdsa (0.13)
enum34 (1.1.6)
Fabric (1.8.3)
idna (2.5)
iniparse (0.3.1)
ipaddress (1.0.18)
ipython (1.2.1)
mysql-connector-python (2.1.6)
MySQL-python (1.2.3rc1)
mysql-replication (0.13)
mysql-utilities (1.6.5)
ordereddict (1.2)
paramiko (1.12.4)
pip (7.1.0)
pyasn1 (0.3.1)
pycparser (2.18)
pycrypto (2.3)
pycurl (7.19.0)
pygpgme (0.1)
pymssql (2.1.0)
PyMySQL (0.7.11)
PyNaCl (1.1.2)
setuptools (0.6rc11)
six (1.10.0)
urlgrabber (3.9.1)
virtinst (0.600.0)
wheel (0.29.0)
yum-metadata-parser (1.1.2)

http://www.fabfile.org/en/latest/index.html
编写fabfile.py文件:
from fabric.api import run
def host_type(): #函数名随便定义
run('uname -s')
执行:
fab -H localhost host_type
http://docs.fabfile.org/en/1.10/usage/execution.html#connections

#不能同时出现多个hosts
from fabric.api import env
env.user='test' #机器的用户名
env.password='123' #机器的密码
env.hosts = ['node1','node2']

#分组
不能同时出现多个roledefs
from fabric.api import env
env.roledefs['webservers'] = ['www1', 'www2', 'www3']

env.roledefs = {
'web': ['www1', 'www2', 'www3'],
'dns': ['ns1', 'ns2']

Fab -R web …


cryptography 、 PyCrypto 、 PyCryptodome(pycrypto的分支pycryptodome)

PyCrypto:Python Cryptography Toolkit (pycrypto)
PyCryptodome: is a self-contained Python package of low-level cryptographic primitives.

f