前言
【免责申明】本工具的使用者应遵守相关法律法规,尊重微信的版权和隐私,不得侵犯微信或其他第三方的合法权益,不得从事任何违法或不道德的行为。与微信的底层交互来自WeChatFerry HTTP 客户端。
效果展示
注意事项
注意: WeChatFerry HTTP 客户端基于HOOK 新环境先养号。新环境(新机器)登录,哪怕不用机器人,也会被风控。 安装 3.9.10.27
版本的微信,把微信挂上去。 先别搞机器人!先别搞机器人!先别搞机器人!
登录会有风险提示,没关系;
登录当天晚上大概率会被踢下来,没关系;
有空多通过 PC 微信聊聊天。
等看到了 自动登录该设备
的时候,可以上机器人了。
为微信的底层交互来自项目,感谢大佬 https://github.com/lich0821/wcf-client-rust
选择版本:v39.2.4.5微信3.9.10.27版本安装包:WeChatSetup-3.9.10.27
对技术细节无感的可以直接拖到文章末尾获取成品,一键运行
实现细节
WeChatFerryHTTP客户端以及特定微信都安装好以后,首先打开客户端点击启动如下所示然后会在本地的10010端口处启动HTTP服务,包括添加群成员、发送文本、发送图片等等如下然后需要关注的重点就是HTTP回调,这个很重要,基本所有的功能都基于回调实现
机器人框架设计细节
整个项目的框架目录结构大概如下:
├── app.py //主程序入口,启动 Flask 应用
├── callbacks //回调处理相关模块
│ └── message_handler.py
├── config.py //配置文件模块
├── config.yaml
├── config_template.yaml
├── database //如果存在数据库的情况,在该目录下生成.db文件
│ ├── WeChat.db
├── interfaces //存放封装接口的目录,也就是封装HTTP客户端的API
│ └── api.py
├── logs //日志记录模块,方便调试
│ ├── WeChat.log
├── models //数据库表模型目录
│ ├── blacklist.py
│ ├── database.py
├── plugins //机器人插件目录,插件化设计,动态加载插件
│ ├── __init__.py
│ ├── base_plugin.py
│ ├── blacklist.py
│ ├── music_order.py
│ ├── plugin_manager.py
│ ├── translator.py
│ └── ......
├── requirements.txt
第一步是用Flask起一个回调地址,来接收回调消息,这是核心,比如接下来我用127.0.0.1:19001/api/app/testWeChat/weChatCallBackWxHelper,将这个地址同样写到HTTP客户端设置中。
主程序app.py代码如下:
app = Flask(__name__)
# 初始化接口封装类
wechat_api = WeChatAPI(base_url="http://127.0.0.1:10090")
handler = MessageHandler(api=wechat_api)
# 加载插件
handler.load_plugins()
@app.route('/api/app/testWeChat/weChatCallBackWxHelper', methods=['POST'])
def wechat_callback():
data = request.json
logger.info(f"收到回调数据: {data}")
handler.handle_message(data)
return jsonify({"status": "success", "message": "Callback received!"}), 200
if __name__ == '__main__':
app.run(host='127.0.0.1', port=19001)
服务起来后,我们向运行HTTP客户端的微信发送一条任意消息,比如【你好】,如果收到如下日志,就说明服务跑通了
收到回调数据: {'IsSelf': False, 'IsGroup': False, 'Id': 8949284075397752864, 'Type': 1, 'Ts': 1734446247, 'Roomid': 'wxid_87d5vdxxxxxl12', 'Content': '你好', 'Sender': 'wxid_87d5xxxxx3l12', 'Sign': '079140b4407acdf088a90e7b8265fe8f', 'Thumb': '', 'Extra': '', 'Xml': '<msgsource>n <pua>1</pua>n <eggIncluded>1</eggIncluded>n <signature>V1_YtrPKL2o|v1_YtrPKL2o</signature>n <tmp_node>n <publisher-id />n </tmp_node>n <sec_msg_node>n <alnode>n <fr>1</fr>n </alnode>n </sec_msg_node>n</msgsource>n'}
接下来需要将HTTP的接口封装一下,写到interfaces/api.py文件中,因为要以写自助点歌为例,需要机器人发送卡片消息,这里封装发送卡片消息接口为例,代码如下:
class WeChatAPI:
def __init__(self, base_url):
self.base_url = base_url
self.session = requests.Session()
self.session.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Content-Type": "application/json",
"Accept": "application/json",
"Connection": "keep-alive"
})
self.timeout = 20
self.temp_files = [] # 存储临时文件路径
self._start_temp_cleaner()
//封装卡片消息接口
def send_rich_text_message(self, title, name, digest, url, thumburl, account, receiver):
"""发送卡片消息
Args:
title: 标题
name: 来源名称
digest: 摘要/描述
url: 点击跳转的链接
thumburl: 缩略图URL
account: 账号ID
receiver: 接收者ID
Returns:
dict: {"success": bool, "message": str}
"""
start_time = time.time()
try:
url_endpoint = f"{self.base_url}/rich-text"
data = {
"title": title,
"name": name,
"digest": digest,
"url": url,
"thumburl": thumburl,
"account": account,
"receiver": receiver
}
logger.info(f"发送卡片消息请求: {data}")
response = self.session.post(url_endpoint, json=data, timeout=10)
response.raise_for_status()
result = response.json()
elapsed = time.time() - start_time
if result.get('status') == 0 and result.get('error') is None:
logger.info(f"发送卡片消息成功, 耗时: {elapsed:.2f}秒, 响应: {result}")
return {"success": True, "data": result.get('data')}
else:
error_msg = result.get('error') or "API返回失败状态"
logger.error(f"发送卡片消息失败: {error_msg}, 响应: {result}, 耗时: {elapsed:.2f}秒")
return {"success": False, "message": error_msg}
except requests.Timeout:
elapsed = time.time() - start_time
logger.error(f"发送卡片消息超时, 耗时: {elapsed:.2f}秒")
return {"success": False, "message": "请求超时,请稍后重试"}
except Exception as e:
elapsed = time.time() - start_time
logger.error(f"发送卡片消息失败: {e}, 耗时: {elapsed:.2f}秒")
return {"success": False, "message": str(e)}
现在我们可以收到回调消息了,也封装了接口,下一步就是处理回调数据,目录callbacks/message_handler.py是回调处理相关模块比如收到文本消息如何做等等。
class MessageHandler:
def __init__(self, api):
self.api = api
self.plugin_manager = None # 先设为 None,等待外部设置
self.db_session = Session()
self.lock = threading.Lock() # 线程锁
def handle_message(self, data):
"""处理消息"""
msg_type = data.get("type")
sender = data.get("sender")
content = data.get("content", "").strip()
room_id = data.get("roomid")
is_group = data.get("is_group", False)
is_self = data.get("is_self", False)
# 忽略自己发送的消息
if is_self or sender == config.BOT_WXID:
return
try:
# 如果消息内容为空,直接返回
if not content:
logger.info(f"收到空消息,忽略处理")
return
# 确定消息接收者(私聊回发送者,群聊回群)
receiver = room_id if is_group else sender
# 记录处理的消息
if is_group:
logger.info(f"处理群 {room_id} 的消息: {content}")
else:
logger.info(f"处理私聊消息: {content}")
# 处理插件消息
for plugin_name, plugin_handler in self.plugin_manager.plugins.items():
try:
if not self.plugin_manager.is_plugin_enabled(plugin_name):
continue
# 设置插件上下文
plugin_handler.current_sender = sender
plugin_handler.current_room_id = room_id if is_group else None
# 设置额外属性
plugin_handler.extra = data.get('extra', '')
if response:
# 处理图片消息
if isinstance(response, dict) and response.get("type") == "image":
image_url = response["url"]
try:
# 先尝试直接发送网络图片
self.api.send_image_message(image_url, receiver)
except Exception as e:
logger.warning(f"直接发送网络图片失败,尝试下载后发送: {e}")
if hasattr(plugin_handler, 'download_image'):
local_path = plugin_handler.download_image(image_url)
if local_path:
self.api.send_image_message(local_path, receiver)
else:
self.api.send_text_message("抱歉,图片处理失败了~", receiver)
else:
self.api.send_text_message("抱歉,图片发送失败了~", receiver)
else:
# 如果是消息,可以选择@发送者
if is_group:
self.api.send_text_message(response, receiver, aters=[sender])
else:
self.api.send_text_message(response, receiver)
return
except Exception as e:
logger.error(f"插件 [{plugin_name}] 处理消息时出错: {e}")
# 未处理的消息
if is_group:
logger.info(f"群 {room_id} 的未处理消息: {content}")
else:
logger.info(f"未处理的消息: {content}")
except Exception as e:
logger.error(f"处理消息时出错: {e}")
插件化开发设计思路
上文说过,要设计成插件化的模式,这样做的优点是,插件独立,可复用和扩展,无需修改主框架即可添加新插件。 设计思路是,写一个插件基类,所有插件继承自这个基类,实现统一接口
from abc import ABC, abstractmethod
class BasePlugin(ABC):
"""插件基类,所有插件需继承此类"""
def __init__(self, name, description):
self.name = name
self.description = description
@abstractmethod
def handle_message(self, message, sender, api):
"""处理消息的主逻辑"""
pass
然后动态加载插件
class PluginManager:
def __init__(self, api=None):
self.plugins = {}
self.enabled_plugins = set()
self.api = api
self.disabled_plugins = set()
self.global_disable = False
def load_plugins(self, folder="plugins"):
"""加载插件"""
plugin_path = os.path.join(base_path, folder)
plugin_files = [
f[:-3] for f in os.listdir(plugin_path)
if f.endswith(".py") and f != "__init__.py"
]
for plugin_file in plugin_files:
try:
plugin_module = importlib.import_module(f"{folder}.{plugin_file}")
if hasattr(plugin_module, "register"):
plugin_name, handler = plugin_module.register(plugin_manager=self)
self.plugins[plugin_name] = handler
logger.info(f"插件 {plugin_name} 加载成功")
except Exception as e:
logger.error(f"加载插件 {plugin_file} 失败: {e}")
最后就可以写一个自助点歌插件了,代码如下:
class MusicOrderPlugin(BasePlugin):
"""自助点歌插件"""
def __init__(self, plugin_manager=None):
super().__init__(
name="自助点歌",
description="发送 点歌+歌名 来点歌",
category="娱乐"
)
self.plugin_manager = plugin_manager
self.search_api = "http://test.com/search"
self.song_api = "https://api.test.cn/api/wyy"
self.image_api = "https://api.test.com/api/rand.img3" # 添加图片API
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
self.commands = {
"点歌": "发送 点歌+歌名 来点歌,例如:点歌 青花瓷"
}
def _get_random_image(self):
"""获取随机图片URL"""
try:
params = {'format': 'json'}
response = self.session.get(
self.image_api,
params=params,
timeout=10,
verify=False # 禁用 SSL 证书验证
)
# 添加警告过滤
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
response.raise_for_status()
data = response.json()
if data.get('code') == 1 and data.get('imgurl'):
return data['imgurl']
except Exception as e:
logger.error(f"获取随机图片失败: {e}")
# 如果获取失败,返回默认图片
return "https://p1.music.126.net/6y-UleORITEDbvrOLV0Q8A==/5639395138885805.jpg"
def handle_message(self, content):
"""处理消息"""
try:
if isinstance(content, str) and content.startswith("点歌 "):
song_name = content[3:].strip()
logger.info(f"收到点歌请求,歌名: {song_name}")
if not song_name:
return "请输入要点的歌名"
# 1. 搜索歌曲
logger.info(f"开始搜索歌曲: {song_name}")
songs = self._search_song(song_name)
if not songs:
return "未找到相关歌曲"
# 2. 遍历歌曲列表,直到找到可用的歌曲链接
for song_info in songs:
logger.info(f"尝试获取歌曲链接, ID: {song_info['id']}")
song_url = self._get_song_url(song_info['id'])
# 如果链接是404或者获取失败,尝试下一首
if not song_url or song_url == "https://music.163.com/404":
logger.warning(f"歌曲 {song_info['name']} 链接获取失败,尝试下一首")
continue
logger.info(f"获取到可用歌曲链接: {song_url}")
# 3. 获取随机图片作为缩略图
thumb_url = self._get_random_image()
logger.info(f"获取到缩略图链接: {thumb_url}")
# 4. 发送卡片消息
if not self.plugin_manager or not self.plugin_manager.api:
return "API未初始化"
# 确定接收者(从插件上下文获取)
receiver = self.current_room_id if hasattr(self, 'current_room_id') and self.current_room_id else self.current_sender
if not receiver:
return "无法获取接收者信息"
result = self.plugin_manager.api.send_rich_text_message(
title=song_info["name"],
name="小艾安全强力驱动",
digest=song_info["artist"],
url=song_url,
thumburl=thumb_url,
account="1",
receiver=receiver
)
if not result.get("success"):
logger.error(f"发送卡片消息失败: {result}")
return f"发送失败: {result.get('message')}"
# 发送成功则返回 None
return None
# 如果所有歌曲都尝试失败
return "抱歉,未找到可用的歌曲链接"
except Exception as e:
logger.error(f"点歌失败: {e}", exc_info=True)
return "点歌失败,请稍后重试"
return None
def _search_song(self, keyword):
"""搜索歌曲"""
try:
response = requests.get(self.search_api, params={"keywords": keyword}, timeout=10)
response.raise_for_status()
result = response.json()
songs = result.get("result", {}).get("songs", [])
if not songs:
return None
# 返回所有歌曲信息,而不是只返回第一首
return [{
"id": song["id"],
"name": song["name"],
"artist": song["artists"][0]["name"],
"thumb_url": song["artists"][0]["img1v1Url"]
} for song in songs[:5]] # 只取前5首歌
except Exception as e:
logger.error(f"搜索歌曲失败: {e}")
return None
def _get_song_url(self, song_id):
"""获取歌曲链接"""
try:
response = requests.get(self.song_api, params={"id": song_id}, timeout=10)
response.raise_for_status()
return response.url # 返回最终的重定向URL
except Exception as e:
logger.error(f"获取歌曲链接失败: {e}")
return None
def _send_music_card(self, receiver, account, song_info, song_url):
"""发送音乐卡片"""
if not self.plugin_manager or not self.plugin_manager.api:
return {"success": False, "message": "API未初始化"}
# 从插件管理器的API获取消息上下文
msg = self.plugin_manager.api.msg
if msg:
is_group = msg.get('is_group', False)
if is_group:
receiver = msg.get('roomid') # 群聊发送到群里
else:
receiver = msg.get('sender') # 私聊发送给个人
if not receiver:
return {"success": False, "message": "无法获取接收者信息"}
return self.plugin_manager.api.send_rich_text_message(
title=song_info["name"],
name=f"{song_info['artist']} - {song_info['name']}",
digest=song_info["artist"],
url=song_url,
thumburl=song_info["thumb_url"],
account=account,
receiver=receiver
)
def register(plugin_manager=None):
"""注册插件"""
if not plugin_manager:
raise ValueError("plugin_manager must be provided")
plugin_name = "点歌"
plugin_handler = MusicOrderPlugin(plugin_manager=plugin_manager)
return plugin_name, plugin_handler
效果展示以及成品获取
成品使用
可以根据上面提供的思路自行开发,这里也提供程序编译好之后的应用程序,控制台运行会自动生成配置文件,配置非常简单如下:
成品获取
HTTP客户端-PC微信安装包-自助点歌程序如下获取,需要激活码激活使用
链接:https://pan.quark.cn/s/44a6b2274d49
提取码:hVSt
激活码直接添加我微信进行领取
后续计划
已经基于这个框架写了较多的娱乐插件,后续过年的时候都会放出来大家一起娱乐一下【狗头】
参考链接
https://github.com/lich0821/wcf-client-rusthttps://github.com/lich0821/WeChatFerry
推荐站内搜索:最好用的开发软件、免费开源系统、渗透测试工具云盘下载、最新渗透测试资料、最新黑客工具下载……
还没有评论,来说两句吧...