faker2/jd_ql_log_scan.py
2022-06-23 10:48:45 +08:00

271 lines
14 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import os
import re
import time
import sys
import subprocess
import traceback
from depend import Depend
'''
cron: 30 23 * * *
new Env('青龙日志分析 && 自动补全依赖');
########环境变量设置#########
## (非必填) 脚本唯一性检测,请在此处填写你想运行的脚本的绝对路径,其他脚本检测到路径与此变量不符将会停止运行
QL_LOG_SCAN_SCRIPT_PATH=
## (非必填)指定日志目录: 默认自动识别青龙目录,出现错误才需要手动指定日志目录
export QL_LOG_PATH="/ql/data/log/"
## (非必填)指定不扫描目录:多个请用逗号隔开
export QL_LOG_BLACK_DIR=""
## (非必填)指定不扫描日志文件:多个请用逗号隔开
export QL_LOG_BLACK_FILE=""
## (非必填)需要被扫描的最近n天的日志0就是只分析当天的日志(最近24小时的日志)
export QL_LOG_SCAN_DEEPIN=0
## (非必填)是否尝试自动补齐日志报错里提示的依赖
export QL_LOG_AUTO_INSTALL_DEPEND=False
## (非必填)强制指定npm包管理器有些青龙使用了pnpm而不是npm注意鉴别
export QL_LOG_NPM="npm"
'''
class QlLogScan(Depend):
def __init__(self):
self.pyname = os.path.basename(__file__).replace(".py", "")
print(self.only_check(self.pyname, os.path.abspath(__file__),"QL_LOG_SCAN_SCRIPT_PATH"))
self.ql_log_path = self.get_env("QL_LOG_PATH", self.get_ql_path() + "log/")
self.filter_dir_list = self.not2append(["^\.tmp$", "^update$", self.pyname + "$"],
self.str2list(self.get_env("QL_LOG_BLACK_DIR")))
self.filter_log_list = self.not2append(['task_error\.log', 'start\.log'],
self.str2list(self.get_env("QL_LOG_BLACK_FILE")))
self.history_scan_deepin = self.get_env("QL_LOG_SCAN_DEEPIN", "0")
self.auto_install_depend = self.get_env("QL_LOG_AUTO_INSTALL_DEPEND", False)
self.npm = self.get_env("QL_LOG_NPM", "npm")
self.log_stat = {
"all": 0,
"nodejs_err": 0,
"python_err": 0,
"err_dict": {},
"nodejs_depend": [],
"python_depend": [],
"readlog_err" :[]
}
self.LogNameHeadList = self.generateLogNameHeadList()
self.analysisLog()
self.showAnalysisLog()
if self.auto_install_depend:
self.auto_depend()
def generateLogNameHeadList(self):
scan_list = []
for i in range((self.history_scan_deepin + 1) * 24):
scan_list.append(time.strftime("%Y-%m-%d-%H", time.localtime((int(time.time()) - (3600 * i)))))
return scan_list
def analysisLog(self):
for path, dir_list, file_list in os.walk(self.ql_log_path):
dir_name = path.replace(self.ql_log_path, "")
if not self.re_filter_list(dir_name, self.filter_dir_list):
for file_name in file_list:
if not self.re_filter_list(file_name, self.filter_log_list) and re.search(r"(.*?).log$",
file_name) and file_name[
:13] in self.LogNameHeadList:
# 读取日志
log_file = open(os.path.join(path, file_name), "r")
try:
log_text = log_file.read(2097152)
log_file.close()
# 分析日志
nodejs_err_list = re.findall(r"Error\:(.*\s?)Require stack\:", log_text)
python_err_list = re.findall(
r"Traceback \(most recent call last\):([\n\s]+File[\s\S]*?, line [\d]+, in[\s\S]*?["
r"\s\S]*?\n[\s\S]*?\n)+(.*?)\n",
log_text)
if nodejs_err_list:
self.log_stat["nodejs_err"] += len(nodejs_err_list)
self.log_stat["err_dict"][dir_name] = []
for i in nodejs_err_list:
v = i.strip()
self.log_stat["err_dict"][dir_name].append({"type": "NodeJs", "log": v})
# 依赖缺失判断
miss_depend = re.search(r"Cannot find module '([a-zA-Z\d_-]+)'", v)
if miss_depend and miss_depend.group(1) not in self.log_stat["nodejs_depend"]:
self.log_stat["nodejs_depend"].append(miss_depend.group(1))
elif python_err_list:
self.log_stat["python_err"] += len(python_err_list)
self.log_stat["err_dict"][dir_name] = []
for i in python_err_list:
v = i[-1].strip()
self.log_stat["err_dict"][dir_name].append({"type": "Python", "log": v})
# 依赖缺失判断
miss_depend = re.search(r"ModuleNotFoundError: No module named \'([a-zA-Z0-9_-]+)\'", v)
if miss_depend and miss_depend.group(1) not in self.log_stat["python_depend"]:
self.log_stat["python_depend"].append(miss_depend.group(1))
self.log_stat["all"] += 1
except Exception as e:
err_log = "读取日志" + str(os.path.join(path, file_name)) + "出现异常: " + str(e) + "\n"
self.log_stat["readlog_err"].append(err_log)
print(err_log)
@staticmethod
def format_log_date(text):
text = text.split("-")
return text[0] + "" + text[1] + "" + text[2] + "" + text[3] + ""
def showAnalysisLog(self):
len_nodejs_depend = len(self.log_stat["nodejs_depend"])
len_python_depend = len(self.log_stat["python_depend"])
# 展示分析结果
result = "📆分析 " + (
self.format_log_date(self.LogNameHeadList[0]) + " ~ " + self.format_log_date(
self.LogNameHeadList[-1]) if len(self.LogNameHeadList) != 1 else
self.LogNameHeadList[
0]) + " 的日志报告:\n"
if len(self.log_stat["readlog_err"]) != 0:
result += "🔍脚本在读取日志过程中,出现了" + str(len(self.log_stat["readlog_err"])) + "个异常,详细信息将在最后展示\n"
result += "✅正常运行脚本:" + str(self.log_stat["all"]) + "\n"
if self.log_stat["all"] != 0:
result += "⛔异常运行脚本:" + str(self.log_stat["nodejs_err"] + self.log_stat["python_err"]) + " 次,占比 " + str(
round(
(float(self.log_stat["nodejs_err"] + self.log_stat["python_err"]) / float(
self.log_stat["all"]) * 100),
2)) + " %\n"
result += "🧐其中:\n"
result += " 🕵Nodejs异常" + str(self.log_stat["nodejs_err"]) + " 次,占比 " + str(
round((float(self.log_stat["nodejs_err"]) / float(self.log_stat["all"]) * 100), 2)) + " %\n"
result += " 🕵Python异常" + str(self.log_stat["python_err"]) + " 次,占比 " + str(
round((float(self.log_stat["python_err"]) / float(self.log_stat["all"]) * 100), 2)) + " %\n"
if len_nodejs_depend > 0 or len_python_depend > 0:
result += "👮‍♂️依赖检测: " + (
"☢已开启自动补全依赖,将执行shell命令,请小心恶意脚本👿" if self.auto_install_depend else "❎未开启自动补全依赖,请手动补齐以下依赖🤗") + "\n"
if len_nodejs_depend > 0:
result += "👮检测到缺失NodeJs依赖:\n"
result += str(self.log_stat["nodejs_depend"]) + "\n"
if len_python_depend > 0:
result += "👮检测到缺失Python依赖:\n"
result += str(self.log_stat["python_depend"]) + "\n"
result += "💂‍♂️详细错误日志:\n\n"
for k, v in self.log_stat["err_dict"].items():
if v:
result += "🛑脚本:" + k + "\n"
for i in v:
result += "- ⚠" + i["type"] + "错误:" + i["log"] + " \n\n\n"
if len(self.log_stat["readlog_err"]) != 0:
result += "👷‍♀️读取日志异常日志:\n\n"
for i in self.log_stat["readlog_err"]:
result += "" + i + "\n"
send("🐲青龙日志分析", result)
return result
def auto_depend(self):
len_nodejs_depend = len(self.log_stat["nodejs_depend"])
len_python_depend = len(self.log_stat["python_depend"])
len_all_depend = len_nodejs_depend + len_python_depend
if len_nodejs_depend > 0:
for i in range(len_nodejs_depend):
shell_log = "🤖检测是否安装NodeJs依赖: " + self.log_stat["nodejs_depend"][i] + "\n"
check_result = self.check_depend(self.log_stat["nodejs_depend"][i], "nodejs")
if check_result:
shell_log += "📦" + str(check_result) + "已安装, 跳过安装\n"
else:
shell_log += "⚙当前正在自动安装NodeJs依赖: " + self.log_stat["nodejs_depend"][i] + "\n"
install_result = self.install_depend(self.log_stat["nodejs_depend"][i], "nodejs")
shell_log += "🔨执行命令: " + install_result[0] + "\n"
if install_result[2] != '':
shell_log += "⛔出错了: \n" + install_result[2] + "\n\n"
elif install_result[1] != '':
shell_log += "✅执行完成: \n" + install_result[1] + "\n\n"
send("🐲青龙自动安装依赖(" + str(i + 1) + "/" + str(len_all_depend) + ")", shell_log)
if len_python_depend > 0:
for i in range(len_python_depend):
shell_log = "🤖检测是否安装Python依赖: " + self.log_stat["python_depend"][i] + "\n"
check_result = self.check_depend(self.log_stat["python_depend"][i], "python")
if check_result:
shell_log += "📦" + str(check_result) + "已安装, 跳过安装\n"
else:
shell_log += "⚙当前正在自动安装Python依赖: " + self.log_stat["python_depend"][i] + "\n"
install_result = self.install_depend(self.log_stat["python_depend"][i], "python")
shell_log += "🔨执行命令: " + install_result[0] + "\n"
if install_result[2] != '':
shell_log += "⛔出错了: \n" + install_result[2] + "\n\n"
elif install_result[1] != '':
shell_log += "✅执行完成: \n" + install_result[1] + "\n\n"
send("🐲青龙自动安装依赖(" + str(i + 1 + len_nodejs_depend) + "/" + str(len_all_depend) + ")", shell_log)
def install_depend(self, package, package_type):
package = package.replace("+", "\+")
if package_type == "nodejs":
install_exec = 'cd /ql/ && ' + self.npm + ' install ' + package
elif package_type == "python":
install_exec = 'pip3 install ' + package
elif package_type == "docker":
install_exec = 'apk update && apk add ' + package
if install_exec:
install = subprocess.run(install_exec, shell=True, capture_output=True, text=True)
install_log = install.stdout
install_err = install.stderr
return install_exec, install_log, install_err
else:
return None
def check_depend(self, package, package_type):
package = package.replace("+", "\+")
if package_type == "nodejs":
list_exec = 'cd /ql/ && ' + self.npm + ' list|grep ' + package
list_log = subprocess.run(list_exec, shell=True, capture_output=True, text=True).stdout
npm_re = re.search(r"[\s]" + package + "@[\d.]+", list_log)
pnpm_re = re.search(r"^" + package + " [\d.]+", list_log)
if npm_re:
return npm_re.group()
elif pnpm_re:
return pnpm_re.group()
else:
return None
elif package_type == "python":
list_exec = 'pip3 list|grep ' + package
list_log = subprocess.run(list_exec, shell=True, capture_output=True, text=True).stdout
pip_re = re.search(package + "[ ]+[\d.]+", list_log)
if pip_re:
return pip_re.group()
else:
return None
elif package_type == "docker":
list_exec = 'apk list|grep ' + package
list_log = subprocess.run(list_exec, shell=True, capture_output=True, text=True).stdout
docker_re = re.search(package + "-[\d.]+", list_log)
if docker_re:
return docker_re.group()
else:
return None
else:
return None
def load_send():
global send
cur_path = os.path.abspath(os.path.dirname(__file__))
sys.path.append(cur_path)
if os.path.exists(cur_path + "/notify.py"):
try:
from notify import send
except:
send = False
print("加载通知服务失败~")
else:
send = False
print("加载通知服务失败~")
if __name__ == '__main__':
load_send()
ql = QlLogScan()