基于PySpark的网络服务异常检测系统 (四) Mysql与SparkSQL对接同步数据 kmeans算法计算预测异常

时间:2023-03-08 21:18:03

基于Django Restframework和Spark的异常检测系统,数据库为MySQL、Redis, 消息队列为Celery,分析服务为Spark SQL和Spark Mllib,使用kmeans和随机森林算法对网络服务数据进行分析;数据分为全量数据和正常数据,每天通过自动跑定时job从全量数据中导入正常数据供算法做模型训练。

使用celery批量导入(指定时间段)正常样本到数据库

def add_normal_cat_data(data):
"""
构建数据model 用yield每次返回1000条数据
:param data
:return:
"""
tmp_cat_normal_models = [] for cat_data in data:
response_time = cat_data.get('response_time')
request_count = cat_data.get('request_count') or 1
fail_count = cat_data.get('fail_count') or 1
cat_data['id'] = str(uuid4())
if response_time < 1.2 and (fail_count / request_count) < 0.2:
cat_obj = CatNormalResource(
**cat_data
)
tmp_cat_normal_models.append(cat_obj) if len(tmp_cat_normal_models) >= 1000:
yield tmp_cat_normal_models
tmp_cat_normal_models = [] yield tmp_cat_normal_models @celery_app.task
def insert_normal_cat_data(data):
"""
使用异步,每次用bulk 批量插入 1000条数据
:param data:
:return:
"""
try:
for i in add_normal_cat_data(data):
CatNormalResource.objects.bulk_create(i)
except Exception as e:
print(e)
raise RsError('插入数据库失败')

通过contab定时job,每天自动导入正常样本

 def get_current_timestamp():
"""
获取当前时间戳
:return:
"""
return int(time.time()) * 1000 def convert_datetime_to_timestamp(dtime):
"""
把datetime转换为时间戳
:param datetime:
:return:
"""
timestamp = time.mktime(dtime.timetuple())
return int(timestamp) * 1000 def get_cache_cat_data(start_time, end_time, force=False):
"""
获取指定时间段的cat数据
:param start_time:
:param end_time:
:return:
"""
key = 'GET_CAT_RES_DATA_{0}_TO_{1}'.format(
start_time, end_time
)
content = cache.get(key)
if force or not content:
content = get_cat_res_data(start_time, end_time)
if content:
cache.set(key, content, timeout=CACHE_TIMEOUT_DEFAULT) return content def add_normal_cat_data(data):
"""
构建数据model 用yield每次返回1000条数据
:param data
:return:
"""
tmp_cat_normal_models = [] for cat_data in data:
response_time = cat_data.get('response_time')
request_count = cat_data.get('request_count') or 1
fail_count = cat_data.get('fail_count') or 1
cat_data['id'] = str(uuid4())
if response_time < 1.2 and (fail_count / request_count) < 0.2:
cat_obj = CatNormalResource(
**cat_data
)
tmp_cat_normal_models.append(cat_obj) if len(tmp_cat_normal_models) >= 1000:
yield tmp_cat_normal_models
tmp_cat_normal_models = [] yield tmp_cat_normal_models @celery_app.task
def insert_normal_cat_data(data):
"""
使用异步,每次用bulk 批量插入 1000条数据
:param data:
:return:
"""
try:
for i in add_normal_cat_data(data):
CatNormalResource.objects.bulk_create(i)
except Exception as e:
print(e)
raise RsError('插入数据库失败') def insert_normal_cat_job():
"""
定时导入前一天的正常数据
:return:
"""
logger.info('insert_normal_cat_job ....')
dt_time = datetime.datetime.now() + datetime.timedelta(days=-1)
start_time = convert_datetime_to_timestamp(dt_time)
end_time = get_current_timestamp()
data = get_cache_cat_data(start_time, end_time)
insert_normal_cat_data.delay(data)

SparkSQL读取指定时间段数据,使用Kmeans预测新数据异常

 class SparkAnomaly(object):
def __init__(self, appid, start_time, end_time):
self.appid = appid
self.start_time = start_time
self.end_time = end_time
self.spark_sql = SparkSql()
self.cat_res = self.spark_sql.load_table_dataframe('cat_resource')
self.cat_normal_res = self.spark_sql.load_table_dataframe(
'cat_normal_resource'
)
self.filter_str = "appid = {0} " \
"and create_time >= {1} " \
"and update_time <= {2}".format(
self.appid, self.start_time, self.end_time,
)
self.model_filter_str = "appid = {0}".format(self.appid) def get_kmeans_model(self):
"""
得到kmeans聚类模型
:return:
"""
df = self.cat_normal_res.filter(self.model_filter_str)
parsed_data_rdd = df.rdd.map(lambda x: array([x[4], x[5], x[6]])) # 建立聚类模型
clusters = KMeans.train(
parsed_data_rdd, 3,
maxIterations=10,
initializationMode="random"
) return clusters def get_kmeans_predict(self):
"""
获取appid指定时间段的预测结果
:return:
"""
df = self.cat_res.filter(self.filter_str)
parsed_data_rdd = df.rdd.map(lambda x: array([x[4], x[5], x[6]]))
clusters = self.get_kmeans_model()
predict_result = clusters.predict(parsed_data_rdd)
return predict_result.collect() def get_kmeans_result(appid, start_time, end_time):
"""
获取appid指定时间段的cat数据
:param appid:
:param start_time:
:param end_time:
:return:
"""
cat_result_obj = CatResultData.objects.filter(
appid=appid,
start_time=start_time,
end_time=end_time,
algorithm_name="kmeans"
).first()
if not cat_result_obj:
arg_result = SparkAnomaly(appid, start_time, end_time)
content = arg_result.get_kmeans_predict()
cat_result_obj = CatResultData.objects.create(
appid=appid,
start_time=start_time,
end_time=end_time,
algorithm_name="kmeans",
result_data=content
)
ser_data = CatResultDataSerializer(cat_result_obj).data
ser_data['result_data'] = json.loads(ser_data['result_data'])
return ser_data

以上代码为系统的部分代码,详细代码请见我的github  https://github.com/a342058040/network_anomaly_detection