手工巡检需要逐台登录服务器,耗费大量时间和精力,同时容易因操作疏漏而遗漏关键指标,导致数据不统一、风险增大;而自动化巡检利用脚本批量采集数据,实现实时监控和标准化输出,大幅提升效率并降低人为错误,从而为系统稳定运行提供坚实保障。
为了解决这些问题,我们可以使用自动化脚本来批量巡检服务器,并自动生成巡检表。如下图所示:
获取本文的脚本可以关注公众号【攻城狮成长日记】,私信回复脚本链接
即可获取。
自动化巡检的优势
相比手工巡检,使用脚本进行自动化巡检具有以下优势:
批量执行:一次性检查所有服务器,提升巡检效率; 减少人工干预:降低人为错误,提高数据准确性; 标准化输出:巡检数据统一格式,方便存储和分析; 可扩展性:脚本可根据需求扩展,支持更多巡检项。
编写批量巡检脚本
为了让脚本更灵活,适应更多情况,我们换了个新思路:让用户自己定义要检查的项目,并用正则表达式来提取结果,同时并定义展示的模板。数据类似如下:
# 定义巡检配置项inspection_configs=[{"name":"内核版本","command":"uname -r",# 修正命令为获取内核版本的正确命令"regex_pattern":r"^(d+.d+.d+-d+-w+)",# 精确匹配"format_template":"内核版本: {0}"},....]
command: 是指定要执行的命令,例如上述的
uname -r
。regex_pattern:是指通过正则匹配想要的结果。
format_template:是指在巡检表展示的数据。例如
内核版本: {0}
,其中{0}
会被填入匹配到的真实数据。
通过以下的ssh_connect
函数进行远程SSH
连接到服务,然后,再通过execute_inspection
函数执行巡检命令。
defssh_connect(server):"""建立SSH连接""" client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy())try: client.connect( hostname=server["hostname"], port=server["port"], username=server["username"], password=server["password"])return clientexcept Exception as e:print(f"连接{server['hostname']}失败: {str(e)}")returnNonedefexecute_inspection(client, config):"""执行单个巡检项并解析结果""" stdin, stdout, stderr = client.exec_command(config["command"]) output = stdout.read().decode() error = stderr.read().decode()if error:print(f"命令执行错误: {config['command']}n{error}")returnNone#使用正则表达式提取数据match= re.search(config["regex_pattern"], output)ifmatch:returnmatch.groups()else:print(f"未匹配到数据: {config['name']}")returnNone
通过以下函数进行数据格式化,
defformat_result(config, result):if"regex_groups"in config: params =dict(zip(config["regex_groups"], result))return config["format_template"].format(**params)else:return config["format_template"].format(*result)
通过generate_report
函数,生成巡检报告。
defgenerate_report(data, filename):"""生成合并IP单元格的巡检报告""" report_data =[]for ip, inspections in data.items(): ip_entry ={"IP": ip,"巡检项":[],"结果":[]}for item_name, result in inspections.items(): ip_entry["巡检项"].append(item_name) ip_entry["结果"].append( format_result(next(c for c in inspection_configs if c["name"]== item_name), result)) report_data.append(ip_entry)# 转换为二维表格(保留空值用于单元格合并) max_items =max(len(entry["巡检项"])for entry in report_data) rows =[]for entry in report_data: rows.append([entry["IP"], entry["巡检项"][0], entry["结果"][0]])for i inrange(1,len(entry["巡检项"])): rows.append(["", entry["巡检项"][i], entry["结果"][i]]) df = pd.DataFrame(rows, columns=["IP","巡检项","结果"])# 生成Excel文件with pd.ExcelWriter(filename, engine='xlsxwriter')as writer: df.to_excel(writer, index=False, sheet_name='巡检报告') workbook = writer.book worksheet = writer.sheets['巡检报告']# 设置列宽 worksheet.set_column('A:A',15)# IP列 worksheet.set_column('B:B',25)# 巡检项列 worksheet.set_column('C:C',45)# 结果列# 标题格式 header_format = workbook.add_format({'bold':True,'bg_color':'#4CAF50','font_color':'white','border':1,'valign':'vcenter'})for col_num, value inenumerate(df.columns.values): worksheet.write(0, col_num, value, header_format)# 数据格式 data_format = workbook.add_format({'border':1,'valign':'top'})for row inrange(1,len(df)+1):for col inrange(3): worksheet.write(row, col, df.iat[row-1, col], data_format)# 合并IP单元格 ip_col =0 current_ip =None merge_start =1for row_num inrange(1,len(df)+1): cell_value = df.iat[row_num-1, ip_col]if cell_value:if current_ip isnotNoneand merge_start < row_num: worksheet.merge_range( merge_start, ip_col, row_num-1, ip_col, current_ip, workbook.add_format({'valign':'vcenter','border':1})) current_ip = cell_value merge_start = row_num# 处理最后一个IPif current_ip isnotNoneand merge_start <=len(df): worksheet.merge_range( merge_start, ip_col,len(df), ip_col, current_ip, workbook.add_format({'valign':'vcenter','border':1}))print(f"报告已生成: {os.path.abspath(filename)}")
脚本演示
通过定义需要巡检的设备,如下所示:
# 服务器配置列表servers =[{"hostname":"192.168.31.100","port":22,"username":"root","password":"password"# 推荐使用密钥认证},{"hostname":"192.168.31.101","port":22,"username":"root","password":"password"# 推荐使用密钥认证},{"hostname":"192.168.31.102","port":22,"username":"root","password":"password"# 推荐使用密钥认证},# 添加更多服务器...]
然后,再定义巡检项目,内容如下:
# 定义巡检配置项inspection_configs =[{"name":"内存使用","command":"free -m","regex_pattern":r"Mem:s+(d+)s+(d+)s+(d+)","description":"获取内存总量、已用、空闲(单位:MB)","format_template":"总量: {0} MB, 已用: {1} MB, 空闲: {2} MB"},{"name":"内核版本","command":"uname -r","regex_pattern":r"^(d+.d+.d+-d+-w+)","format_template":"内核版本: {0}"},{"name":"磁盘使用率","command":"df -h","regex_pattern":r"(d+)%s+/(?!.*snap)","description":"获取根分区使用率","format_template":"使用率: {0}%"},{"name":"CPU核心数","command":"lscpu","regex_pattern":r"CPU(s):s+(d+)","description":"获取CPU核心数量","format_template":"CPU核心数: {0}%"},{"name":"系统负载","command":"uptime","regex_pattern":r"load average:s+([d.]+),s+([d.]+),s+([d.]+)","description":"获取1分钟、5分钟、15分钟系统负载","format_template":"1分钟: {0} , 5分钟: {1} , 15分钟: {2} "}]
定义主函数方法,如下:
defmain(): all_results ={}for server in servers: client = ssh_connect(server)if client:print(f"正在检查 {server['hostname']}...") results ={}for config in inspection_configs: value = execute_inspection(client, config)if value: results[config["name"]]= value all_results[server['hostname']]= results client.close()if all_results: filename =f"server_inspection_{datetime.now().strftime('%Y%m%d_%H%M')}.xlsx" generate_report(all_results, filename)else:print("未获取到任何服务器数据")
执行该方法后,会在当前目录下生成一个巡检表。内容如下所示:
总结
通过编写Shell
脚本,我们可以高效地进行服务器巡检,并生成标准化的巡检表,极大提升运维效率。同时,结合日志分析、异常告警、定时任务等功能,可构建更完善的自动化运维体系,减少手工操作带来的负担。
扫码关注公众号
推荐站内搜索:最好用的开发软件、免费开源系统、渗透测试工具云盘下载、最新渗透测试资料、最新黑客工具下载……
还没有评论,来说两句吧...