流量录制原理
录制内容
实现思路
首先,调用 accept 获得一个调用方的连接; 第二步,在这个连接上通过调用 recv 读取请求数据,解析请求; 第三步,目标服务开始执行业务逻辑,过程中可能需要调用一个或多个依赖服务,对于每一次依赖服务调用,目标服务需要通过 connect 与依赖服务建立连接,然后在这个连接上通过 send 发送请求数据,通过 recv 接收依赖服务响应; 最后,目标服务通过 send 给调用方返回响应数据。
区分不同的请求
2、为了提高处理速度,可能创建子线程并发调用依赖服务。
实际上,子线程也可能再创建子线程,形成下图所示的线程关系:
对于这种涉及子线程的场景,我们只要把子线程的数据合并到请求处理线程即可。每个请求都会对应一个请求处理线程和一系列的子线程,最终我们可以根据线程 ID 来区分出不同请求。
区分数据类型
流量录制实现
录制agent:与目标进程部署在相同容器中,根据进程名找到要录制的目标进程 pid,(1) 控制录制 server 开启/关闭录制;(7) 从录制 server 接收原始数据,解析成完整流量,(8) 保存到日志文件中。
录制server:部署在宿主机上,负责 (2, 3) 加载/挂载 eBPF程序、(6) 从 eBPF Map 中读取原始数据。
eBPF 程序:负责在目标进程 (4) 发送和接收数据时,(5) 从挂载的函数中读取原始数据并写入 eBPF Map 中。
选择插桩点
accept 和 connect 用于区分 socket 类型。
send 和 recv 用于捕获发送和接收的数据。
close 用于识别调用的结束。
inet_accept
inet_stream_connect
inet_sendmsg
inet_recvmsg
inet_release
开发eBPF程序
开发eBPF程序
int inet_sendmsg(struct socket *sock, struct msghdr *msg, size_t size)
sock socket 指针
msg 要发送的数据
size 要发送数据的长度
成功时返回发送的数据长度,失败时返回错误码。
在函数入口处记录函数参数和上下文
在函数返回时记录实际发送的数据内容
函数入口 eBPF 程序:
SEC("kprobe/inet_sendmsg")
int BPF_KPROBE(inet_sendmsg_entry, struct socket *sock, struct msghdr *msg)
{
struct probe_ctx pctx = {
.bpf_ctx = ctx,
.version = EVENT_VERSION,
.source = EVENT_SOURCE_SOCKET,
.type = EVENT_SOCK_SENDMSG,
.sr.sock = sock,
};
int err;
// 过滤掉不需要录制的进程
if (pid_filter(&pctx)) {
return 0;
}
// 读取 socket 类型信息
err = read_socket_info(&pctx, &pctx.sr.sockinfo, sock);
if (err) {
tm_err2(&pctx, ERROR_READ_SOCKET_INFO, __LINE__, err);
return 0;
}
// 记录 msg 中的数据信息
err = bpf_probe_read(&pctx.sr.iter, sizeof(pctx.sr.iter), &msg->msg_iter);
if (err) {
tm_err2(&pctx, ERROR_BPF_PROBE_READ, __LINE__, err);
return 0;
}
// 将相关上下文信息保存到 map 中
pctx.id = bpf_ktime_get_ns();
err = save_context(pctx.pid, &pctx);
if (err) {
tm_err2(&pctx, ERROR_SAVE_CONTEXT, __LINE__, err);
}
return 0;
}
函数返回 eBPF 程序:
SEC("kretprobe/inet_sendmsg")
int BPF_KRETPROBE(inet_sendmsg_exit, int retval)
{
struct probe_ctx pctx = {
.bpf_ctx = ctx,
.version = EVENT_VERSION,
.source = EVENT_SOURCE_SOCKET,
.type = EVENT_SOCK_SENDMSG,
};
struct sock_send_recv_event event = {};
int err;
// 过滤掉不需要录制的进程
if (pid_filter(&pctx)) {
return 0;
}
// 如果发送失败, 跳过录制数据
if (retval <= 0) {
goto out;
}
// 从 map 中读取提前保存的上下文信息
err = read_context(pctx.pid, &pctx);
if (err) {
tm_err2(&pctx, ERROR_READ_CONTEXT, __LINE__, err);
goto out;
}
// 构造 sendmsg 报文
event.version = pctx.version;
event.source = pctx.source;
event.type = pctx.type;
event.tgid = pctx.tgid;
event.pid = pctx.pid;
event.id = pctx.id;
event.sock = (u64)pctx.sr.s;
event.sock_family = pctx.sr.sockinfo.sock_family;
event.sock_type = pctx.sr.sockinfo.sock_type;
// 从 msg 中读取数据填充到 event 报文, 并通过 map 传递到用户空间
sock_data_output(&pctx, &event, &pctx.sr.iter);
out:
// 清理上下文信息
err = delete_context(pctx.pid);
if (err) {
tm_err2(&pctx, ERROR_DELETE_CONTEXT, __LINE__, err);
}
return 0;
}
获取goid
获取goid
getg 函数:
// getg returns the pointer to the current g.
// The compiler rewrites calls to this function into instructions
// that fetch the g directly (from TLS or from the dedicated register).
func getg() *g
根据函数注释,当前 g 的指针是放在线程本地存储(TLS)中的,调用 getg() 的代码由编译器进行重写。为了找到 getg() 的实现方式,我们看到 runtime.newg 函数中调用了 getg,对它进行反汇编,发现 g 的指针保存在 fs 寄存器 -8 的内存地址上:
接下来,我们找到 struct g 中的 goid 字段(位于 runtime/runtime2.go):
type g struct {
.... 此处省略大量字段
goid int64
.... 此处省略大量字段
}
拿到 g 的指针后,只要加上 goid 字段的偏移量即可获取到 goid。同时,考虑到不同的 go 版本之间,goid 偏移量可能不同,最终在 eBPF 程序中我们可以这样获取当前 goid:
0
遇到的问题
eBPF 程序虽然可以使用 C 语言开发,但是与普通 C 语言开发过程有较大的差别,增加了很多限制。
不允许使用全局变量、常量字符串或数组,可以保存到 map 中。
不支持函数调用,可以通过 inline 内联解决。
栈空间不能超过512字节,必要时可通过 array 类型的 map 做缓冲区。
不能直接访问用户态和内核态内存,要通过 bpf-helper 的相关函数。
单个程序指令条数不能超过 1000000,尽量保持 eBPF 程序逻辑简单,复杂的处理放在用户态程序完成。
循环必须有明确的次数上限,不能只靠运行时判断。
结构体成员要内存对齐,否则可能导致部分内存未初始化,引发 verifier 报错。
代码经过编译器优化后 verifier 可能误报内存访问越界问题,可以在代码中增加 if 判断帮助 verifer 识别,必要时可通过内联汇编的方式解决。
....
随着 clang 和内核对 ebpf 支持的逐渐完善,很多问题也在逐步得到解决,后续的开发体验也会变得更顺畅。
安全机制
总结
总结
END
作者及部门介绍
招聘信息
团队后端、测试需求招聘中,欢迎有兴趣的小伙伴加入,可以扫描下方二维码简历直投,期待你的加入!
研发工程师
测试开发工程师
推荐站内搜索:最好用的开发软件、免费开源系统、渗透测试工具云盘下载、最新渗透测试资料、最新黑客工具下载……
还没有评论,来说两句吧...