“百家”,既是“诸子百家”,亦为“百花齐放”。他们是各行各业网络安全专家:有CSO、业界大咖、权威代表,更有奋战在网安一线的实践者、思想者和分享者。安在“百家”,最佳实践,真知灼见,思想火花,期待有你!
仅凭Python入门基础+疯狂对话AI,作为售前的我竟然用MCP框架搭出了能用的安全智能体!整个过程堪称"大型真香现场",先上成果图。
模型上下文协议(Model Context Protocol,简称MCP)是Anthropic于2024年推出的一种开放标准,旨在规范AI模型与外部数据源、工具之间的交互方式。
MCP 采用客户端-服务器模式,包括三个核心组件:
主机(Host):运行LLM或与LLM交互的应用程序,如Claude Desktop、Cursor IDE、Cherry Studio、VS Code + Cline插件等。主机负责管理MCP客户端、与LLM通信、处理用户交互(如审批工具调用),并将从服务器获取的信息整合到LLM上下文中。
MCP 客户端 :请求信息或代表用户执行任务的 AI 助手和聊天机器人。
MCP 服务器 :检索相关信息或执行请求操作(例如,调用外部 API)的数据存储库、搜索引擎和 API。
MCP 服务器向客户端提供两种主要功能:
资源 :可以检索并用作 LLM 交互上下文的结构化数据、文档和内容。这使得 AI 助手能够从数据库、搜索索引或其他来源访问相关信息。
工具 :可执行功能,使 LLM 能够与外部系统交互、执行计算或采取现实世界行动,允许助手触发工作流程、调用 API 或动态操作数据。
cursor:https://www.cursor.com/cn
MCP Server: 实战所需的python格式的mcp server 下载地址:
https://github.com/modelcontextprotocol/python-sdk,也可以选择其他语言:
TypeScript SDK
Java SDK
Kotlin SDK
C# SDK
ES的MCP server:
https://github.com/elastic/mcp-server-elasticsearch
#pip install mcp#pip show mcpName: mcpVersion: 1.6.0Summary: Model Context Protocol SDKHome-page: https://modelcontextprotocol.ioAuthor: Anthropic, PBC.Author-email: License: MITLocation: /root/python-sdk-main/venv/lib/python3.10/site-packagesRequires: anyio, httpx, httpx-sse, pydantic, pydantic-settings, sse-starlette, starlette, uvicornRequired-by: fastmcp
生成python代码,完成API认证和调用过程
第一个过程:获取认证信息
第二个过程:获取API数据
以下代码为示例,以您的安全平台的API官方文档为准
获取资产信息:资产管理API
获取漏洞信息:漏洞API
获取告警信息:告警API
其他API信息
root@user:~/python-sdk-main/src/mcp# cat wx_api.py import requestsimport jsonimport hashlibdef auth_check(): """获取认证信息""" auth_url = "http://x.x.x.x/v1/api/auth" # 认证请求body auth_body = { "username": "username", "password": "password" } # 认证请求头 headers = { "Content-Type": "application/json" }def get_backdoor_info(auth_data): """获取后门检测信息""" if not auth_data: print("缺少认证数据,无法继续") return # API端点 url = "http://x.x.x.x/external/api/" # 请求参数 params = { "page": 0, "size": 50 } # 从认证数据中获取必要信息 com_id = auth_data["comId"] jwt = auth_data["jwt"] sign_key = auth_data["signKey"] # 生成sign值 sign = generate_sign(com_id, sign_key) # 设置请求头 headers = { "Content-Type": "application/json", "comID": com_id, "timestamp": "744e4b4a456", "sign": sign, "Authorization": f"Bearer {jwt}" }def main(): print("=== 开始API测试 ===") # 获取认证信息 auth_data = auth_check() if auth_data: print("n认证成功,开始调用目标API") get_backdoor_info(auth_data) else: print("n认证失败,无法继续执行API调用")if __name__ == "__main__": main()
import sysimport osimport asyncioimport jsonimport loggingimport aiohttpfrom aiohttp import webfrom mcp.server.fastmcp import FastMCPfrom mcp.server.fastmcp.exceptions import ResourceError# API配置 略... # 创建API客户端实例api_client = APIClient()# 创建FastMCP实例mcp = FastMCP( name="API MCP Server", instructions="API MCP服务", debug=True, capabilities={ "resources": ["*"], "tools": ["*"] }, sse_config={ "ping_interval": 30, "retry_timeout": 10000, "max_retries": 3, "keep_alive": True, "cors_origins": ["*"], # 允许所有来源的CORS请求 "headers": { "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Methods": "GET, POST, OPTIONS", "Access-Control-Allow-Headers": "Content-Type" } })# 注册ping工具@mcp.tool()async def ping() -> str: """ping工具""" logger.info("收到ping请求") return "pong"# 注册后门检测资源@mcp.tool()async def check_backdoor() -> dict: """后门检测工具""" logger.info("收到后门检测请求") try: if not api_client.auth_data: logger.info("未找到认证信息,开始认证...") if not await api_client.authenticate(): return { "message": "API认证失败", "error": "认证失败" } logger.info("认证成功") # 发送API请求 result = await api_client.make_api_request(BACKDOOR_URL) logger.info(f"API响应: {result}") # 检查是否有rows字段 if "rows" in result: alerts = [] for row in result["rows"]: alert = { "主机IP": row.get("displayIp", "未知"), "主机名": row.get("hostname", "未知"), "后门类型": row.get("backDoorType", "未知"), "后门名称": row.get("backDoorName", "未知"), "描述": row.get("description", "无描述"), "发现时间": row.get("createTime", 0) } alerts.append(alert) response_data = {"message": f"发现{len(alerts)}个后门告警", "data": alerts} logger.info(f"处理后门告警结果: {response_data}") return response_data # 如果没有rows字段,返回空结果 logger.warning(f"API响应中未找到rows字段: {result}") return { "message": "未发现后门", "data": [] }@mcp.tool()async def check_assets() -> dict: """资产端口检测工具""" logger.info("收到资产端口检测请求") try: if not api_client.auth_data: logger.info("未找到认证信息,开始认证...") if not await api_client.authenticate(): return { "message": "API认证失败", "error": "认证失败" } logger.info("认证成功") # 发送API请求 result = await api_client.make_api_request(ASSETS_URL) logger.info(f"API响应: {result}") # 检查是否有rows字段 if "rows" in result: assets = [] for row in result["rows"]: asset = { "主机IP": row.get("displayIp", "未知"), "主机名": row.get("hostname", "未知"), "端口": row.get("port", "未知"), "协议": row.get("protocol", "未知"), "服务": row.get("service", "未知"), "发现时间": row.get("createTime", 0) } assets.append(asset) response_data = {"message": f"发现{len(assets)}个资产端口", "data": assets} logger.info(f"处理资产端口结果: {response_data}") return response_data # 如果没有rows字段,返回空结果 logger.warning(f"API响应中未找到rows字段: {result}") return { "message": "未发现资产端口", "data": [] } except Exception as e: logger.error(f"请求处理过程中发生错误: {str(e)}") logger.error(f"错误类型: {type(e)}") logger.error(f"错误详情: {e.__dict__ if hasattr(e, '__dict__') else '无详细信息'}") return { "message": "请求处理错误", "error": str(e) }@mcp.tool()async def check_vulnerabilities() -> dict: """漏洞检测工具""" logger.info("收到漏洞检测请求") try: if not api_client.auth_data: logger.info("未找到认证信息,开始认证...") if not await api_client.authenticate(): return { "message": "API认证失败", "error": "认证失败" } logger.info("认证成功") # 发送API请求 result = await api_client.make_api_request(VUL_URL) logger.info(f"API响应: {result}") # 检查是否有rows字段 if "rows" in result: vulns = [] for row in result["rows"]: vuln = { "主机IP": row.get("displayIp", "未知"), "主机名": row.get("hostname", "未知"), "漏洞名称": row.get("vulName", "未知"), "漏洞类型": row.get("vulType", "未知"), "风险等级": row.get("riskLevel", "未知"), "描述": row.get("description", "无描述"), "发现时间": row.get("createTime", 0) } vulns.append(vuln) response_data = {"message": f"发现{len(vulns)}个漏洞", "data": vulns} logger.info(f"处理漏洞结果: {response_data}") return response_data # 如果没有rows字段,返回空结果 logger.warning(f"API响应中未找到rows字段: {result}") return { "message": "未发现漏洞", "data": [] }@mcp.resource("backdoor://status")async def backdoor_status() -> str: """后门检测状态资源"""@mcp.resource("assets://status")async def assets_status() -> str: """资产端口状态资源"""@mcp.resource("vulnerabilities://status")async def vulnerabilities_status() -> str: """漏洞状态资源"""async def main(): """主函数""" logger.info("开始配置MCP服务器...") logger.info(f"当前工作目录: {os.getcwd()}") logger.info(f"Python路径: {sys.path}") try: # 配置MCP服务器 mcp = FastMCP( name="API MCP Server", instructions="API MCP服务", debug=True, capabilities={ "resources": ["*"], "tools": ["*"] } )
绿点说明连接成功,Tools工具可用
{ "mcpServers": { "wx-mcp-server": { "isActive": true, "name": "wx-mcp-server", "type": "sse", "host": "x.x.x.x", "port": 8000, "url": "http://x.x.x.x:8000/sse", "capabilities": { "resources": ["*"] } } }}
cursor客户端会调用claude大模型进行分析
以上步骤展示了MCP在安全应用场景的探索,代码仅供参考,企业级部署仍需关注大模型和MCP本身的安全。借助安全智能体可实现如下价值:
可结合安全产品产生的大量告警进行告警降噪,将碎片化信息关联为关键攻击事件。
生成事件解读报告,包括攻击成因、影响范围及完整思考过程,提升研判准确性。
自动列出受影响资产,提供具体处置操作(如漏洞修复、隔离主机等),简化人工响应流程。
深度融合DeepSeek等大模型技术,实现:
上下文分析:解密攻击载荷、关联时间线等复杂信息。
升维决策:从海量数据中提炼关键线索,生成智能报表。
与现有安全产品协同作战,构建“监测-分析-处置”闭环,提升企业整体安全防护效率。
在整个智能体构建的过程中代码的实现全部借助cursor生成和调试,实际测试中也遇到到了很多问题,通过和cursor的不断对话让我对MCP理解更深了,有了MCP之后,只要整体的需求框架清晰,零基础的人也可以自己构建各种智能体,目前有各种的mcp 应用场景可以实践(https://mcp.so/),以下是cursor开发过程中的几点总结:
a.需求越细,AI越乖(需求描述一定要具体,熟悉后可以建立cursor rule)
b.调试口诀:先问是不是,再问为什么(别着急改代码,先确认问题场景)
c.防坑指南:给AI加限定词(如用MCPv1.6语法、考虑Python3.8环境)
d.知识吸收:每解决1个bug就问AI这个问题的通用解决模式是什么。
杨兴|青藤云安全售前工程师
拥有世界500强企业就职经验,专注企业级数字化架构设计,具备多云安全融合、自动化运维及AI场景化方案落地经验。擅长将AIGC等新技术转化为客户商业价值。
添加作者
技术标签:
Azure架构专家 | 全栈云安全 | 智能运维体系
技术认证图谱:
⌜架构设计⌟:Azure Solutions Architect Expert | 阿里云ACE
⌜云原生⌟:红帽RHCE | CKA(Kubernetes管理员)
⌜数据智能⌟:CDMP(数据管理) | 数据安全高级工程师
⌜安全合规⌟:CISP(注册信息安全工程师)
⌜流程体系⌟:PMP® | ITIL 4 | COBIT5 | CSM(敏捷专家)
⌜AI工程⌟:微软AI-900认证(Azure AI Fundamentals)
持续进化:
追踪AI安全智能体技术趋势,强化技术敏锐度。
推荐站内搜索:最好用的开发软件、免费开源系统、渗透测试工具云盘下载、最新渗透测试资料、最新黑客工具下载……
还没有评论,来说两句吧...