要求:
文件分布:
流程图:
import pika
import os
import socket class Server(object):
def __init__(self, queuename):
self.queuename = queuename
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host = 'localhost'))
self.channel = self.connection.channel() #声明一个管道
self.channel.queue_declare(queue=self.queuename)
def handle(self,command):
message = os.popen(command.decode()).read()
if not message:
message = 'wrong command'
return message
def on_requet(self, ch, method,props,body):
response = self.handle(body)
print(response)
ch.basic_publish(exchange='',
routing_key=props.reply_to, #拿到客户端随机生成的queue
properties = pika.BasicProperties(correlation_id = props.correlation_id),
body = str(response))
ch.basic_ack(delivery_tag = method.delivery_tag)#确保任务完成 def start(self):
self.channel.basic_consume(self.on_requet, queue=self.queuename) #收到消息就调用on_requet
print(" [x] Awaiting RPC requests")
self.channel.start_consuming() if __name__ == "__main__":
hostname = socket.gethostname()
ip = socket.gethostbyname(hostname) # 获取本地ip地址作为queue name
print(ip)
queue_name = ip
server = Server(queue_name)
server.start()
server
import pika
import uuid
import random
import threading class Client(object):
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host = 'localhost'))
self.channel = self.connection.channel() def on_response(self, ch, method, props, body):
if self.callback_id == props.correlation_id:
self.response = body
ch.basic_ack(delivery_tag=method.delivery_tag) def get_response(self, callback_queue, corr_id):
self.response = None
self.callback_id = corr_id
self.channel.basic_consume(self.on_response, queue=callback_queue)
while self.response is None:
self.connection.process_data_events() # 非阻塞版的start_consuming
return self.response def call(self, queuename, n):
# 声明临时的回调队列
result = self.channel.queue_declare(exclusive=False)
self.callback_queue = result.method.queue
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(exchange='',
routing_key=queuename,
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id = self.corr_id,
),
body = n)
return self.callback_queue, self.corr_id class Threading(object):
def __init__(self):
self.info={} def check_all(self, cmd):
'''
查看已经有的任务id
:param cmd:
:return:
'''
for i in self.info:
print('task id: %s, host: %s, command:%s' % (i, self.info[i][0], self.info[i][1])) def check_task(self, cmd_id):
'''
查看运行结果
:param cmd_id:
:return:
'''
try:
id = int(cmd_id.split()[1])
callack_queue = self.info[id][2]
callack_id=self.info[id][3]
client = Client()
res = client.get_response(callack_queue, callack_id)
print(res.decode())
del self.info[id]
except Exception as e:
print(e) def run(self, cmd):
comm = cmd.split("\"")[1]
hosts = cmd.split("--")
host = hosts[1].split()[1:] #拿ip地址
for i in host:
id = random.randint(10000,99999)
obj = Client()
res = obj.call(i, comm)
self.info[id] = [i,comm,res[0], res[1]]
return self.info def ref(self, cmd):
'''
反射
:param cmd:
:return:
'''
str = cmd.split()[0]
if hasattr(self,str):
func = getattr(self,str)
r = func(cmd)
if r is not None:
for i in r:
print('task id: %s, host: %s, command:%s' % (i, r[i][0], r[i][1])) def thread(self):
while True:
cmd = input("->>").strip()
if not cmd:continue
t1 = threading.Thread(target=self.ref, args=(cmd, ))
t1.start() obj = Threading()
res = obj.thread()
cliernt