[bigdata] 使用Redis队列来实现与机器无关的Job提交与执行 (python实现)

时间:2024-04-03 21:36:50

用例场景: 定时从远程多台机器上下载文件存入HDFS中。一开始采用shell 一对一的方式实现,但对于由于网络或者其他原因造成下载失败的任务无法进行重试,且如果某台agent机器down机,将导致它对应的所有下载服务中断,重新提交下载任务也极为麻烦。故考虑采用redis队列来实现与机器无关的job提交与执行。

任务提交实现 log_agent.py:

每隔十分钟执行一次,通过crontab -e 设置,在一台服务器上设置即可。

*/10 * * * * python /usr/local/apps/log-agent/log_agent.py >> /data/logs/msls_wget.log 2>&1

# !/usr/bin/env python
# -*- coding: utf-8 -*- from redis import Redis
from rq import Queue
from log_job import wget
import datetime R = Redis(host='10.100.1.47') q = Queue(connection=R, name="QUEUE_WGET") def submit_job(ptime, uri_tmpl, bid, need_decompress, hdfs_path_format,split_hdfs_path_format, splitable, is_new):
q.enqueue(wget,
args=(ptime,uri_tmpl,bid,need_decompress,hdfs_path_format,split_hdfs_path_format,splitable, is_new),
timeout=60*15) def main(ptime):
remotehosts = [
"54.223.101.179",
"54.223.101.31",
"54.223.101.86",
"54.223.101.79",
"54.223.101.85",
"54.223.101.80"
] url_hdfs_paths = {
"pcp": ["http://{host}/access_{ptime}.gz",
"/data/logs/pcp/{day}/{remotehost}.{ptime}.{decompress_suffix}",
"/data/logs/pcp/{day}/split.{remotehost}.{ptime}.{decompress_suffix}"],
"pcc": ["http://{host}/pcc/access_{ptime}.gz",
"/data/logs/pcc/{day}/{remotehost}.{ptime}.{decompress_suffix}",
"/data/logs/pcc/{day}/split.{remotehost}.{ptime}.{decompress_suffix}",
],
"m": ["http://{host}/m/access_{ptime}.gz",
"/data/logs/m/{day}/{remotehost}.{ptime}.{decompress_suffix}",
"/data/logs/m/{day}/split.{remotehost}.{ptime}.{decompress_suffix}",
],
}
for remotehost in remotehosts:
for bid, hdfs_paths in url_hdfs_paths.items():
uri = hdfs_paths[0].format(host=remotehost, ptime=ptime)
bid=bid
hdfs_path = hdfs_paths[1]
split_hdfs_path = hdfs_paths[2]
print "wget({0},{1},{2},{3})".format(uri, bid, hdfs_path, split_hdfs_path)
submit_job(ptime,uri,bid,True,hdfs_path,split_hdfs_path,True,False) if __name__ == "__main__":
now = datetime.datetime.now()
last_time = now + datetime.timedelta(minutes=-10)
last_ptime = last_time.strftime('%Y%m%d%H%M')
ptime = "{0}".format(int(last_ptime) / 10 * 10)
main(ptime)

任务执行实现 log_job.py:

通过supervisor进行管理,部署在多台服务器上。

[program:MSLS_WGET]
command=rqworker -H 10.100.1.47 --name 10.100.1.46.msls_wget_%(process_num)s --path /usr/local/apps/log-agent MSLS_WGET
directory=/usr/local/apps/log-agent
autostart=true
autorestart=true
process_name = wget_%(process_num)s
numprocs=6
startsecs=5
startretries=5
redirect_stderr=true
stdout_logfile=/data/logs/wget_%(process_num)s.log

log_job.py, 逻辑大致是从redis queue中获取job执行,先从远程服务器下载文件,然后逐行读取文件,对文件中的跨天内容进行处理,并且对日志文件行数与文件大小进行统计,通过fluentd 传到mysql数据库。

#!/usr/bin/env python
# -*- coding: utf-8 -*- import urlparse
import urllib
from subprocess import Popen, PIPE, call
import sys
import datetime
import os
import requests
import gzip
from fluent import sender
import socket sender.setup('MT_PULL_PUT', host='10.100.1.120', port=24225)
from fluent import event
from functools import partial fluent_log = partial(event.Event, 'store') def log(msg):
sys.stdout.write(msg + "\n")
sys.stdout.flush() def check_path(path):
dir = os.path.split(path)[0]
if not os.path.isdir(dir):
os.makedirs(dir)
if os.path.isfile(path):
os.remove(path) def clear_path(*paths):
for p in paths:
if os.path.isfile(p):
log("[CLEAR][DEL] {0}".format(p))
os.remove(p)
else:
pass
raise def create_hdfs_dir(hdfs_file_path):
path = os.path.split(hdfs_file_path)[0]
cmd = "hadoop fs -mkdir -p {0}".format(path)
log(cmd)
call([cmd, ],
shell=True,
stdin=PIPE,
stdout=PIPE,
stderr=PIPE) class MtFile(object):
compress_suffix = 'gz'
decompress_suffix = "log" ptime_format = "%Y%m%d%H%M"
day_format = "%y-%m-%d"
hour_format = "%Y-%m-%d-%H"
nginx_time_format = '[%d/%b/%Y:%H' Compress_Path = "/data/nginx/{bid}/{day}/{remotehost}.{ptime}.{compress_suffix}"
DeCompress_Path = "/data/nginx/{bid}/{day}/{remotehost}.{ptime}.{decompress_suffix}"
Split_Remote_Path = "/data/nginx/{bid}/{day}/split.{remotehost}.{ptime}.{decompress_suffix}"
Split_Local_Path = "/data/nginx/{bid}/{day}/split.{localhost}.{ptime}.{decompress_suffix}" def __init__(self,
ptime,
uri_tmpl,
bid,
need_decompress,
hdfs_path_format,
split_hdfs_path_format,
splitable,
is_new): self.ptime = ptime
self.uri_tmpl = uri_tmpl
self.bid = bid
self.need_decompress = need_decompress
self.hdfs_path_format = hdfs_path_format
self.split_hdfs_path_format = split_hdfs_path_format
self.splitable = splitable
self.is_new = is_new self.ptime_obj = datetime.datetime.strptime(self.ptime, self.ptime_format)
self.today = self.ptime_obj
self.yesterday = self.ptime_obj - datetime.timedelta(0, 300) if self.is_new:
self.ptime = self.today.strftime(self.hour_format) self.url_obj = urlparse.urlparse(self.uri_tmpl)
self.remotehost = self.url_obj.netloc
self.uri = self.uri_tmpl.format(**self.kwargs_today) self.file_size = 0
self.yfile_size = 0
self.log_lines = 0
self.ylog_lines = 0 msg = "Ptime {0} today {1} yesterday {2} uri {3} ".format(self.ptime, self.today, self.yesterday, self.uri)
self.log(msg) if not self.is_local:
check_path(self.local_today_path)
if self.splitable and self.need_split or self.need_decompress:
self.fd_today = self.getfd(self.local_today_path)
if self.splitable and self.need_split:
check_path(self.local_yesterday_path)
self.fd_yesterday = self.getfd(self.local_yesterday_path) self.getfile() if self.bid.startswith('llott41') and not self.isexisted:
self.log('llott41 not existed... but will not raise exception.')
return self.put_today_file()
if self.splitable and self.need_split:
self.put_yesterday_file() def getfd(self, path):
dir = os.path.split(path)[0]
(not os.path.isdir(dir)) and os.makedirs(dir)
if (not self.is_local) and os.path.isfile(path):
os.remove(path)
return open(path, 'wb') def log(self, msg):
_ = "{0}\n".format(msg)
sys.stdout.write(_)
sys.stdout.flush() @property
def kwargs_today(self):
if self.is_new:
ptime = self.today.strftime(self.hour_format)
else:
ptime = self.today.strftime(self.ptime_format)[:12] #print ptime
lhost=os.environ.get('HOSTNAME', 'null')
if lhost=="localhost.localdomain":
lhost=socket.getfqdn() _ = {'bid': self.bid,
'day': self.today.strftime(self.day_format)[:8],
'remotehost': self.remotehost,
'localhost': lhost,
'ptime': ptime,
"decompress_suffix": self.decompress_suffix,
"compress_suffix": self.compress_suffix}
return _.copy() @property
def kwargs_yesterday(self):
if self.is_new:
ptime = self.yesterday.strftime(self.hour_format)
else:
ptime = self.yesterday.strftime(self.ptime_format)[:12] lhost=os.environ.get('HOSTNAME', 'null')
if lhost=="localhost.localdomain":
lhost=socket.getfqdn() _ = {'bid': self.bid,
'day': self.yesterday.strftime(self.day_format)[:8],
'remotehost': self.remotehost,
'localhost': lhost,
'ptime': ptime,
"decompress_suffix": self.decompress_suffix,
"compress_suffix": self.compress_suffix}
return _.copy() @property
def local_path_tmpl(self):
if self.splitable and self.need_split:
if self.is_local:
return self.Split_Local_Path
else:
return self.Split_Remote_Path
else:
return self.DeCompress_Path @property
def hdfs_path_tmpl(self):
if self.splitable and self.need_split:
return self.split_hdfs_path_format
else:
return self.hdfs_path_format @property
def local_today_path(self): """
uziped text file
"""
if self.is_local:
if self.splitable and self.need_split:
return self.Split_Local_Path.format(**self.kwargs_today)
else:
return self.uri_tmpl.format(**self.kwargs_today)
else:
return self.local_path_tmpl.format(**self.kwargs_today) @property
def local_yesterday_path(self):
"""
unziped text file
"""
if self.is_local:
if self.splitable and self.need_split:
return self.Split_Local_Path.format(**self.kwargs_yesterday)
else:
return self.uri_tmpl.format(**self.kwargs_yesterday)
else:
return self.local_path_tmpl.format(**self.kwargs_yesterday) @property
def hdfs_today_path(self):
"""
hdfs file path
"""
return self.hdfs_path_tmpl.format(**self.kwargs_today) @property
def hdfs_yesterday_path(self):
"""
hdfs file path
"""
return self.hdfs_path_tmpl.format(**self.kwargs_yesterday) @property
def local_download_path(self):
"""
"""
if self.need_decompress:
return self.is_local and self.local_today_path or self.Compress_Path.format(**self.kwargs_today)
else:
return self.is_local and self.local_today_path or self.DeCompress_Path.format(**self.kwargs_today) @property
def is_local(self):
return os.path.isfile(self.uri) @property
def isexisted(self):
if self.is_local:
return os.path.isfile(self.uri)
else:
head_obj = requests.head(self.uri)
return head_obj.status_code == 200 @property
def need_split(self):
if not self.is_new:
return self.ptime_obj.strftime('%H%M') == ''
else:
return False @property
def localspath(self):
if self.is_local:
return self.uri
else:
return self.local_download_path def getfile(self):
"""
DownLoad OR Get Local Path
AND SPLIT FILE TO MUTI FILES
"""
if not self.bid.startswith('llott41') and not self.isexisted:
raise
elif self.bid.startswith('llott41') and not self.isexisted:
return if not self.is_local:
self.log("Load {0} => {1}".format(self.uri, self.localspath)) urllib.urlretrieve(self.uri, self.localspath) if self.need_decompress:
self.log("unzip {0}".format(self.localspath))
fr = gzip.open(self.localspath)
else:
fr = open(self.localspath) if self.splitable and self.need_split:
for line in fr:
if self.today.strftime(self.nginx_time_format) in line:
self.log_lines += 1
self.fd_today.write(line)
elif self.yesterday.strftime(self.nginx_time_format) in line:
self.ylog_lines += 1
self.fd_yesterday.write(line)
else:
log("Error Time. Log: " + line)
self.log("split to {0} {1}".format(self.fd_today.name, self.fd_yesterday.name))
else:
for line in fr:
self.log_lines += 1
if self.need_decompress:
self.fd_today.write(line)
else:
if not self.need_decompress:
fr = open(self.uri)
else:
fr = gzip.open(self.uri)
if self.splitable and self.need_split:
for line in fr:
if self.today.strftime(self.nginx_time_format) in line:
self.log_lines += 1
self.fd_today.write(line)
elif self.yesterday.strftime(self.nginx_time_format) in line:
self.ylog_lines += 1
self.fd_yesterday.write(line)
else:
log("Error Time. Log: " + line)
self.log("split to {0} {1}".format(self.fd_today.name, self.fd_yesterday.name))
else:
for line in fr:
self.log_lines += 1
if self.need_decompress:
self.fd_today.write(line) if self.splitable and self.need_split or self.need_decompress:
self.fd_today.flush()
if self.splitable and self.need_split:
self.fd_yesterday.flush() try:
self.fd_today.close()
if self.splitable and self.need_split:
self.fd_yesterday.close()
except:
pass def __del__(self):
"""
CLose Fd
"""
try:
self.fd_today.close()
if self.splitable and self.need_split:
self.fd_yesterday.close()
except:
pass
try:
if os.path.stat(self.fd_today.name).st_size <= 0:
os.remove(self.fd_today.name)
if self.splitable and self.need_split and os.path.stat(self.fd_yesterday.name).st_size <= 0:
os.remove(self.fd_yesterday.name)
except:
pass def put_yesterday_file(self): isputted = put_hdfs(self.hdfs_yesterday_path, self.local_yesterday_path)
if isputted:
self.yfile_size = os.stat(self.local_yesterday_path).st_size
if self.is_local:
rhost = os.environ.get('HOSTNAME', 'null')
else:
rhost = self.uri.split('/')[2]
json_data = {"bid": self.bid,
"ftime": self.yesterday.strftime(self.ptime_format),
"lines": self.ylog_lines,
"size": self.yfile_size,
"rhost": rhost,
"lhost": os.environ.get('HOSTNAME', 'null')}
fluent_log(json_data)
print json_data
else:
self.log("Put failed or No need to Put.") def put_today_file(self):
isputted = put_hdfs(self.hdfs_today_path, self.local_today_path)
if isputted:
self.file_size = os.stat(self.local_today_path).st_size
if self.is_local:
rhost = os.environ.get('HOSTNAME', 'null')
else:
rhost = self.uri.split('/')[2]
json_data = {"bid": self.bid,
"ftime": self.today.strftime(self.ptime_format),
"lines": self.log_lines,
"size": self.file_size,
"rhost": rhost,
"lhost": os.environ.get('HOSTNAME', 'null')}
fluent_log(json_data)
print json_data
else:
self.log("Put failed or No need to Put.") def put_hdfs(hdfs_path, local_path): create_hdfs_dir(hdfs_path) local_size = os.stat(local_path).st_size if local_size <= 0:
log("[SIZE] {0} is Zero Not Need PUT".format(local_path))
return False cmd = "hadoop fs -test -e {p}".format(p=hdfs_path)
log(cmd) not_existed = call([cmd, ],
shell=True,
stdin=PIPE,
stdout=PIPE,
stderr=PIPE)
log(str(not_existed)) if not_existed:
put_cmd = "hadoop fs -put {local_path} {hdfs_path}".format(local_path=local_path, hdfs_path=hdfs_path)
log(put_cmd)
put_fail = call([put_cmd, ],
shell=True,
stdin=PIPE,
stdout=PIPE,
stderr=PIPE) retries = 1
while put_fail and retries <= 3:
log("[PUT] RETRY {retries} {local_path} => {hdfs_path}".format(retries=retries,local_path=local_path, hdfs_path=hdfs_path))
log(put_cmd)
put_fail = call([put_cmd, ], shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE)
retries = retries + 1 if put_fail:
log("[PUT] ERROR {local_path} => {hdfs_path}".format(local_path=local_path,hdfs_path=hdfs_path))
raise
else:
log("[PUT] OK {local_path} => {hdfs_path}".format(local_path=local_path, hdfs_path=hdfs_path))
return True
else: log("PUT EXISTED {local_path} => {hdfs_path} ".format(local_path=local_path, hdfs_path=hdfs_path)) cmd = "hadoop fs -ls {hdfs_path}".format(hdfs_path=hdfs_path)
hdfs_file = Popen([cmd, ], shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE)
size = int(hdfs_file.stdout.read().split('\n')[1].split()[4]) log("SIZE CHECK LOCAL {0} --- HDFS {1}".format(local_size, size))
if size != local_size:
remove_cmd = "hadoop fs -rm {0}".format(hdfs_path)
call([remove_cmd, ], shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE) log("[DEL] {0}".format(remove_cmd)) put_cmd = "hadoop fs -put {local_path} {hdfs_path}".format(local_path=local_path, hdfs_path=hdfs_path)
put_fail = call([put_cmd, ], shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE) retries = 1
while put_fail and retries <= 3:
log("[PUT] RETRY {retries} {local_path} => {hdfs_path}".format(retries=retries,local_path=local_path, hdfs_path=hdfs_path))
log(put_cmd)
put_fail = call([put_cmd, ], shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE)
retries = retries + 1 if put_fail:
log("[PUT] ERROR {local_path} => {hdfs_path}".format(local_path=local_path,hdfs_path=hdfs_path))
raise
else:
log("[PUT] OK {local_path} => {hdfs_path}".format(local_path=local_path, hdfs_path=hdfs_path))
return True
else:
log("[No Need To PUT] {0} => {1} And Size Check Ok {2}".format(local_path, hdfs_path, size))
return False def wget(ptime,uri_tmpl,bid,need_decompress,hdfs_path_format,split_hdfs_path_format,splitable,is_new):
MtFile(ptime,
uri_tmpl,
bid,
need_decompress,
hdfs_path_format,
split_hdfs_path_format,
splitable,
is_new) if __name__ == "__main__":
ptime = ""
uri_tmpl="http://54.223.101.123/OTT_41/reserve/reserve_{ptime}.log"
uri_tmpl_split = ""
# need_decompress = True
need_decompress = False
bid="llott41/reserve"
splitable = False
hdfs_path_format = "/data/test/flumedata/pcweb/{day}/{localhost}.{ptime}.{decompress_suffix}"
split_hdfs_path_format = "/data/test/flumedata/pcweb/{day}/split.{localhost}.{ptime}.{decompress_suffix}"
wget(ptime,
uri_tmpl,
bid,
need_decompress,
hdfs_path_format,
split_hdfs_path_format,
splitable,
True)

任务执行过程中,如果出错,如网络等原因,则通过raise抛异常退出,job失败,进入Failed队列。此时需要重新将其放入队列进行重试

retryall.sh:

其中10.100.101.120为redis dashboard ip

#!/bin/bash
curl 'http://10.100.101.120:9181/requeue-all' -X POST -H 'Origin: http://10.100.101.120:9181' -H 'Accept-Encoding: gzip, deflate' -H 'Accept-Language: zh-CN,zh;q=0.8,zh-TW;q=0.6,ru;q=0.4,ja;q=0.2,it;q=0.2,mt;q=0.2,en;q=0.2' -H 'User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2272.101 Safari/537.36' -H 'Accept: */*' -H 'Referer: http://10.100.101.120:9181/failed' -H 'X-Requested-With: XMLHttpRequest' -H 'Connection: keep-alive' -H 'Content-Length: 0' --compressed

有时候提交的job不需要了,且因为机器下线等原因导致job 执行一直time out,占用资源, 从dashboard一个个清除很费时间,这时候可以通过如下程序清理不再需要的job。

clearRedisJob.py

#coding=utf-8
import redis r = redis.Redis(host='10.100.1.47') # 所有的 key
keys = r.keys() print keys for key in keys:
if 'rq:job:' in key and r.type(key) == 'hash':
data = r.hget(key, 'data') if data and ('54.223.101.79' in data or '54.223.101.63' in data):
print data
r.delete(key)
r.lrem('rq:queue:MSLS_WGET',key.split(':')[-1])