python 监听salt job状态,并任务数据推送到redis中的方法

时间:2022-07-03 12:34:52

salt分发后,主动将已完成的任务数据推送到redis中,使用redis的生产者模式,进行消息传送

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#coding=utf-8
import fnmatch,json,logging
import salt.config
import salt.utils.event
from salt.utils.redis import RedisPool
import sys,os,datetime,random
import multiprocessing,threading
from joi.utils.gobsAPI import PostWeb
logger = logging.getLogger(__name__)
opts = salt.config.client_config('/data/salt/saltstack/etc/salt/master')
r_conn = RedisPool(opts.get('redis_db')).getConn()
lock = threading.Lock()
class RedisQueueDaemon(object):
    '''
    redis 队列监听器
    '''
    def __init__(self,r_conn):
        self.r_conn = r_conn #redis 连接实例
        self.task_queue = 'task:prod:queue' #任务消息队列
    def listen_task(self):
        '''
        监听主函数
        '''
        while True:
                queue_item = self.r_conn.blpop(self.task_queue,0)[1]
                print "queue get",queue_item
                #self.run_task(queue_item)
                t = threading.Thread(target=self.run_task,args=(queue_item,))
                t.start()
    def run_task(self,info):
        '''
        执行操作函数
        '''
        lock.acquire()
        info = json.loads(info)
        if info['type'] == 'pushTaskData':
            task_data = self.getTaskData(info['jid'])
            task_data = json.loads(task_data) if task_data else []
            logger.info('获取缓存数据:%s' % task_data)
            if task_data:
                if self.sendTaskData2bs(task_data):
                    task_data = []
            self.setTaskData(info['jid'], task_data)
        elif info['type'] == 'setTaskState':
            self.setTaskState(info['jid'],info['state'],info['message'])
        elif info['type'] == 'setTaskData':
            self.setTaskData(info['jid'], info['data'])
        lock.release()
    def getTaskData(self,jid):
        return self.r_conn.hget('task:'+jid,'data')
    def setTaskData(self,jid,data):
        self.r_conn.hset('task:'+jid,'data',json.dumps(data))
    def sendTaskData2bs(self,task_data):
        logger.info('发送任务数据到后端...')
        logger.info(task_data)
        if task_data:
            p = PostWeb('/jgapi/verify',task_data,'pushFlowTaskData')
            result = p.postRes()
            print result
            if result['code']:
                logger.info('发送成功!')
                return True
            else:
                logger.error('发送失败!')
                return False
        else:
            return True
    def setTaskState(self,jid,state,message=''):
        logger.info('到后端设置任务【%s】状态' % str(jid))
        p = PostWeb('/jgapi/verify',{'code':jid,'state':'success','message':message},'setTaskState')
        result = p.postRes()
        if result['code']:
            logger.info('设置任务【%s】状态成功!' % str(jid))
            return True,result
        else:
            logger.error('设置任务【%s】状态失败!' % str(jid))
            return result      
def salt_job_listener():
    '''
    salt job 监听器
    '''
    sevent = salt.utils.event.get_event(
            'master',
            sock_dir=opts['sock_dir'],
            transport=opts['transport'],
            opts=opts) 
    while True:
        ret = sevent.get_event(full=True)
        if ret is None:
            continue
        if fnmatch.fnmatch(ret['tag'], 'salt/job/*/ret/*'):
            task_key = 'task:'+ret['data']['jid']
            task_state = r_conn.hget(task_key,'state')
            task_data = r_conn.hget(task_key,'data')
            if task_state:
                jid_data = {
                    'code':ret['data']['jid'],
                    'project_id':settings.SALT_MASTER_OPTS['project_id'],
                    'serverip':ret['data']['id'],
                    'returns':ret['data']['return'],
                    'name':ret['data']['id'],
                    'state':'success' if ret['data']['success'] else 'failed',
                }
                task_data = json.loads(task_data) if task_data else []
                task_data.append(jid_data)
                logger.info("新增数据:%s" % json.dumps(task_data))
                r_conn.lpush('task:prod:queue',json.dumps({'type':'setTaskData','jid':ret['data']['jid'],'data':task_data}))
                #r_conn.hset(task_key,'data',json.dumps(task_data))                    
                if task_state == 'running':
                    if len(task_data)>=1:
                        logger.info('新增消息到队列:pushTaskData')
                        r_conn.lpush('task:prod:queue',json.dumps({'jid':ret['data']['jid'],'type':'pushTaskData'}))
                else:
                    logger.info('任务{0}完成,发送剩下的数据到后端...'.format(task_key))
                    logger.info('新增消息到队列:pushTaskData')
                    r_conn.lpush('task:prod:queue',json.dumps({'jid':ret['data']['jid'],'type':'pushTaskData'}))
                
                print datetime.datetime.now()
 
def run():
    print 'start redis product queue listerner...'
    logger.info('start redis product queue listerner...')
    multiprocessing.Process(target=RedisQueueDaemon(r_conn).listen_task,args=()).start()
    print 'start salt job listerner...'
    logger.info('start salt job listerner...')
    multiprocessing.Process(target=salt_job_listener,args=()).start()
 
    '''
    p=multiprocessing.Pool(2)
    print 'start redis product queue listerner...'
    p.apply_async(redis_queue_listenr,())
    print 'start salt job listerner...'
    p.apply_async(salt_job_listener,())
    p.close()
    p.join()
    '''

以上这篇python 监听salt job状态,并任务数据推送到redis中的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持服务器之家。

原文链接:https://blog.csdn.net/u011085172/article/details/81228450