基于Python的TestAgent实现

时间:2023-03-09 09:03:09
基于Python的TestAgent实现

问题:

1、本人工作主要做自动化,经常要去Linux后台进行一些脚本操作,有时要去后台执行命令,如果逐个登陆比较费事,效率会大打折扣

2、虽然有可以直接去后台执行命令的AW,但是该AW存在很多问题,而且遇到交互式操作时不能很好的解决

基于以上问题,通过Python写了一个简单的CLI Agent,就叫做TestAgent吧,主要思路:

1、采用POST消息发送到TestAgent,TestAgent进行解析

2、TestAgent接受到消息后,把消息体存为一个文件

3、将文件更改为可执行的,然后启动一个进程去执行脚本

4、如果执行成功将结果返回给客户端,如果失败,同样将错误输出也返回给客户端

5、在POST消息的头域中可以设置超时时间,如果超时,返回“time out”,并将启动的进程给杀掉

代码如下:

 #! /usr/bin/env python
import commands
import socket
import time
import os
import multiprocessing
import uuid
from BaseHTTPServer import BaseHTTPRequestHandler,HTTPServer #HTTPServer的监听端口
PORT=12345 class HttpHandler(BaseHTTPRequestHandler):
tmpfile=''
#处理POST消息
def do_POST(self):
print self.path
content_len = int(self.headers.getheader('content-length',0))
#获取timeout
timeout = int(self.headers.getheader('timeout',0))
if timeout==0:
timeout=5
#解析消息并存储为文件
script=self.rfile.read(content_len)
x=uuid.uuid4()
self.tmpfile="."+str(x.int)
fd=open(self.tmpfile,'w')
fd.write(script)
fd.close()
os.system("chmod +x "+self.tmpfile)
script="./"+self.tmpfile
#执行脚本
self.ExecuteScript(script,timeout) def ExecuteScript(self,script,timeout=5):
#启动另一个进程执行脚本
p=multiprocessing.Process(target=self.ScriptWorker,args=(script,))
p.start()
i=0
while i<timeout:
if(not p.is_alive()):
return "successful"
else:
time.sleep(1)
i=i+1
#超时的话终止进程并杀掉执行任务的进程
p.terminate()
os.system("kill -9 "+str(p.pid))
self.send_error(400,"time out")
self.request.shutdown(socket.SHUT_RDWR)
self.request.close()
#删除临时文件
if self.tmpfile != '':
os.system("rm "+self.tmpfile)
self.tmpfile='' def ScriptWorker(self,script):
#执行脚本,返回状态码和输出
(status,result)=commands.getstatusoutput(script)
print script
print result
#如果成功返回200,如果失败返回400
if status == 0:
self.send_response(200)
else:
self.send_response(400)
self.send_header('Content-type','text/html')
self.end_headers()
self.wfile.write(result)
#删除临时文件
if self.tmpfile != '':
os.system("rm "+self.tmpfile)
self.tmpfile='' if __name__=='__main__':
os.system('rm .*')
server_address=('0.0.0.0',PORT)
http_server=HTTPServer(server_address,HttpHandler)
http_server.serve_forever()

测试:

采用curl或者restful client进行测试

1、执行简单命令

基于Python的TestAgent实现

2、执行的命令不存在

基于Python的TestAgent实现

3、执行一个Python脚本

基于Python的TestAgent实现

基于Python的TestAgent实现

4、执行一个超时的脚本

基于Python的TestAgent实现

5、执行一个带有timeout头域的脚本

基于Python的TestAgent实现

基于Python的TestAgent实现

基于Python的TestAgent实现

至此,基本所有功能都验证过了

PS:该程序理论上可以执行任何脚本,只要脚本的解释器写正确

使用时一般会再写个monitor脚本,放在crontab中,这样就完全可以不登陆服务器了,可以自动拉起TestAgent

希望该程序可以帮助大家,^v^ !!

地址:https://github.com/litlefirefly/TestAgent