eBPF - 例子 进程在 CPU 的运行时间

probe 点: tracepoint:sched:sched_switch
查看参数:

supra@suprabox:~/work/ebpf/bpftrace$ sudo bpftrace -lv tracepoint:sched:sched_switch
tracepoint:sched:sched_switch
    char prev_comm[16]
    pid_t prev_pid
    int prev_prio
    long prev_state
    char next_comm[16]
    pid_t next_pid
    int next_prio

代码如下:

BEGIN
{
    printf("     usecs         count |      distribution                                  |");
}

tracepoint:sched:sched_switch { 
    $prev = args.prev_pid;
    if (@pid[$prev]) {
        @ns = hist((nsecs - @pid[$prev])/1000);
        delete(@pid[$prev]);
    }
    @pid[args.next_pid] = nsecs;
}

END
{
    clear(@pid);
}

BPF 例子 - 观测某个 tcp 连接的状态变化

给2个参数, 分别是 IP 和端口, 观察符合找个条件的tcp 连接的状态变化.

这个例子中本想同时对比 IP 和 port, 但是对于IP 遇到一个问题.

常见的 IPv4 是这么写的(字符串): 145.23.45.23
$sk->__sk_common.skc_daddr 拿到的地址 (unsigned int32): 0X91172d17.
使用 ntop 函数转换后是一个 inet_t, 虽然文档死硬说是 Sting 表示形式: inet_t = “145.23.45.23”

那么为了比较2个 IP 地址, 你想把 0X91172d17 转成 inet_t = “145.23.45.23”, 但是还不能比, 不能用 String 和 inet_t 比较.
要么把 145.23.45.23 通过 pton 转成 uint8[], 然后再通过强转 (uint32)pton("145.23.45.23")) 再和 0X91172d17 比较.

关键的关键是 pton 的参数一定要是 字符串常量, 变量不行, 因为它要在编译时知道类型. TMD.

#!/usr/bin/env bpftrace

#ifndef BPFTRACE_HAVE_BTF
#include <linux/socket.h>
#include <net/sock.h>
#else
#include <sys/socket.h>
#endif

BEGIN
{
    @host = "127.0.0.1";
    @port = (uint16)80;
    if ("" != str($1)) {
        @host = str($1);
    }
    if ("" != str($2)) {
        @port = (uint16)$2;
    }

    printf("looking for tcp connection related to : %s:%d\n", @host, @port);
    printf("%39s:%-6s %39s:%-6s %-10s -> %-10s\n", "src IP", "src port", "dest ip", "dest port", "old state", "new state");
    @states[1] = "ESTABLISHED";
    @states[2] = "SYN_SENT";
    @states[3] = "SYN_RECV";
    @states[4] = "FIN_WAIT1";
    @states[5] = "FIN_WAIT2";
    @states[6] = "TIME_WAIT";
    @states[7] = "CLOSE";
    @states[8] = "CLOSE_WAIT";
    @states[9] = "LAST_ACK";
    @states[10] = "LISTEN";
    @states[11] = "CLOSING";
    @states[12] = "NEW_SYN_RECV";
}

kfunc:vmlinux:tcp_set_state {
    $sk = ((struct sock *) args->sk);
    $inet_family = $sk->__sk_common.skc_family;
    if ($inet_family == AF_INET) {
      $daddr = ntop($sk->__sk_common.skc_daddr);
      $saddr = ntop($sk->__sk_common.skc_rcv_saddr);
    } else {
      $daddr = ntop($sk->__sk_common.skc_v6_daddr.in6_u.u6_addr8);
      $saddr = ntop($sk->__sk_common.skc_v6_rcv_saddr.in6_u.u6_addr8);
    }
    $lport = $sk->__sk_common.skc_num;
    $dport = $sk->__sk_common.skc_dport;

    $dport = bswap($dport);

    if (($dport == @port) || ( $lport == @port)) {
        $curState = @states[args.state];
        $key = str($dport);
        $oldState = @keyMap[$daddr, $key];
        if ($oldState == "") {
            $oldState = "NONE";
        }
        @keyMap[$daddr, $key] = $curState;

        printf("%39s:%-6d %39s:%-6d %-10s -> %-10s\n", $saddr, $lport, $daddr, $dport, $oldState, $curState);
    }
}

END {
    clear(@states);
    clear(@keyMap);
    clear(@host);
    clear(@port);
}

关于 PromQL 的 histogram_quantile 的算法

关于 PromQL 的 histogram_quantile 的算法. 很长一段时间内, 我都以为是使用正态分布算的, 其实并没有那么复杂.

假如我们有11个桶, 我们的数据是从0开始的, 第一个桶是接收落在(0~10]区间的数量, 第二个桶是接收落在(10 ~ 20]区间的数量, 以此类推, 第10个桶接收落在(90~100]区间的数量, 那么第11个桶接收(100 ~ +Inf) 的区间, 通常最后一个区间应该没有值或非常少.

那么如果要求第 95分位等值, 我们就要统计这时总共有多少samples, 其中第95th/100 落在那个桶. 假如我们收到100个samples, 那么第95th/100 就是第95个sample, 看它落在哪个桶. 如果我们收到10000个samples, 那就是第9500个sample, 看它落在哪个桶.

当我们看到它落到那个桶之后, 就在数一下这个桶共有多少个数, 然后算一下这个桶占据了第多少分位(低)到第多少分位(高), 所以就知道了这个桶占据了从多少分位到多少分位.

然后按照这个桶内的数据是平均分布的, 然后算出第nth(95th)是到底处于哪个值.

这篇问答很好的解释了这个算法是如何工作的:
https://stackoverflow.com/questions/55162093/understanding-histogram-quantile-based-on-rate-in-prometheus/69938239#69938239

官方的开源代码: https://github.com/prometheus/prometheus/blob/main/promql/quantile.go

其中要注意的是:

  1. 分位数值在桶内假定为线性分布进行插值计算
  2. 如果分位数落在最高的桶内,将返回第二高桶的上界值
  3. 如果最低桶的上界大于0,则假定自然下界为0

设计好桶是很关键的一步:

  1. 尽量在较低的桶内平均分布;
  2. 最大值不要超过第二高桶的上界;

对于上面提到的 如果分位数落在最高的桶内,将返回第二高桶的上界值, 下面是一个演示: 其中从 10240 到正无穷也有一些samples, 但是不论我们使用多少9999, 这里最多返回10240.

http_server_requests_duration_ms_bucket{le="5",method="GET",commandName="SigninLegacyView"} 0
http_server_requests_duration_ms_bucket{le="20",method="GET",commandName="SigninLegacyView"} 0
http_server_requests_duration_ms_bucket{le="80",method="GET",commandName="SigninLegacyView"} 30
http_server_requests_duration_ms_bucket{le="320",method="GET",commandName="SigninLegacyView"} 5881
http_server_requests_duration_ms_bucket{le="640",method="GET",commandName="SigninLegacyView"} 8567
http_server_requests_duration_ms_bucket{le="1280",method="GET",commandName="SigninLegacyView"} 8831
http_server_requests_duration_ms_bucket{le="2560",method="GET",commandName="SigninLegacyView"} 8865
http_server_requests_duration_ms_bucket{le="5120",method="GET",commandName="SigninLegacyView"} 8865
http_server_requests_duration_ms_bucket{le="10240",method="GET",commandName="SigninLegacyView"} 8867
http_server_requests_duration_ms_bucket{le="+Inf",method="GET",commandName="SigninLegacyView"} 8885

histogram.png

eBPF 例子

介绍

What Is eBPF? 是一本非好的入门书.
Learning eBPF 是同一个作者的另外一本进阶书.

key notes:

  1. eBPF 程序分为用户态程序和内核态程序. 内核态使用C或Rust编写,然后clang编译成eBPF 字节码,当内核遇到某种event之后,就会执行这些内核态的eBPF程序, 然后生成数据, 放到一些eBPF程序定义的Map中. 用户态程序主要用来加载eBPF程序并且获取内核态写入Map的数据, 然后整理分析展现这些数据.
  2. 内核态的程序编写成代码, 然后编译成字节码, 然后提交给内核, 然后eBPF虚拟机验证(Verify)这些代码,如果安全, 则加载运行这些代码, 等待事件处罚, 执行Action,写入接口文件.
  3. eBPF 为什么需要一个虚拟机? 运行时动态编译, 验证, 不是预先编译链接. 它不是通用型, 而是特定场景.
  4. eBPF bpf_trace_printk()/sys/kernel/debug/tracing/trace_pipe.
  5. eBPF Map 是内核态和用户态共享数据的渠道. 用户态写入配置, 内核态保存中间结果, 最终输出.
    各种类型的 BPF Map: https://elixir.bootlin.com/linux/v5.15.86/source/include/uapi/linux/bpf.h#L878
    Linux Kernel 关于BPF Map的文档: https://docs.kernel.org/bpf/maps.html
  6. eBPF 程序不允许使用其它函数, 除了helper 函数, 所以要 __always_inline .

bpftrace

经典的 one liner 的例子: https://github.com/iovisor/bpftrace/blob/master/docs/tutorial_one_liners.md

教程: https://github.com/iovisor/bpftrace/blob/master/docs/reference_guide.md

关于网络的部分: https://access.redhat.com/documentation/en-us/red_hat_enterprise_linux/8/html/configuring_and_managing_networking/network-tracing-using-the-bpf-compiler-collection_configuring-and-managing-networking#doc-wrapper

内置变量: comm, pid, tid, args(所有参数: args.filename)
内置函数: str() 把指针的值变成string.
probe 详细信息, 包括参数: sudo bpftrace -vvl kfunc:vmlinux:tcp_set_state

列出所有的 tracepoint:

sudo bpftrace -l 

https://github.com/lizrice/learning-ebpf

关于 Java SocketReadTimeout

在诊断Java应用程序的诊断过程中, 竟然会遇到 Connect timeout, Socket read timeout. 但是经常会遇到有些有些开发人员对这些概念有些误解. 本文就涉及到一些细节使用一些例子做些说明, 使大家更容易理解.

一个简单的例子

Jersey 做为 Jax-RS 的参考实现, 被广泛用于 Java 应用开发. 下面使用 Jersey 开发一个客户端的例子.

import java.net.URI;

import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;

import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.client.ClientProperties;
...

public String callRestSvc() {
        ClientConfig config = new ClientConfig();
                config.property(ClientProperties.CONNECT_TIMEOUT, 500);
        config.property(ClientProperties.READ_TIMEOUT, 3000);
        Client client = ClientBuilder.newClient(config);
        WebTarget target = client.target(UriBuilder.fromUri("http://localhost:8080/").build());
        try {
            return target.path("rest").
                    request().
                    accept(MediaType.TEXT_PLAIN).
                    async().
                    get(String.class).get(10, TimeUnit.SECONDS);
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        }
    }

服务端的 Socket Read/Write timeout

Tomcat NIO

SocketTimeoutException 的初始化方法上设置断点, 然后可以看到下面的 Tomcat 异常栈.

SocketTimeoutException.<init>() (java.net.SocketTimeoutException:49)
NioEndpoint$Poller.timeout() (org.apache.tomcat.util.net.NioEndpoint$Poller:1086)
NioEndpoint$Poller.run() (org.apache.tomcat.util.net.NioEndpoint$Poller:852)
Thread.run() (java.lang.Thread:829)

如何判断是不是 timeout

Tomcat 每次在读取(read())的时候, 记录当前时间. 每次循环的时候(link)就检查是不是有 timeout: 检查当前时间和上次读的时间的差值, 如果大于设置的 timeout 值, 就设置 error: timeout exception.
code: https://github.com/apache/tomcat/blob/ec8ef7a3a2fa56afb4db4261ebdc0aba848f23ff/java/org/apache/tomcat/util/net/NioEndpoint.java#L1017-L1050

if (socketWrapper.interestOpsHas(SelectionKey.OP_READ)) {
    long delta = now - socketWrapper.getLastRead();
    long timeout = socketWrapper.getReadTimeout();
    if (timeout > 0 && delta > timeout) {
        readTimeout = true;
    }
}
// Check for write timeout
if (!readTimeout && socketWrapper.interestOpsHas(SelectionKey.OP_WRITE)) {
    long delta = now - socketWrapper.getLastWrite();
    long timeout = socketWrapper.getWriteTimeout();
    if (timeout > 0 && delta > timeout) {
        writeTimeout = true;
    }
}                          

Tomcat NIO 2

Tomcat NIO 2 的 Endpoint 直接使用的是 java.util.concurrent.Future 的 timeout 设置:
link: https://github.com/apache/tomcat/blob/ec8ef7a3a2fa56afb4db4261ebdc0aba848f23ff/java/org/apache/tomcat/util/net/Nio2Endpoint.java#L1130-L1162

if (block) {
    try {
        integer = getSocket().read(to);
        long timeout = getReadTimeout();
        if (timeout > 0) {
            nRead = integer.get(timeout, TimeUnit.MILLISECONDS).intValue();
        } else {
            nRead = integer.get().intValue();
        }
    } catch (ExecutionException e) {
        if (e.getCause() instanceof IOException) {
            throw (IOException) e.getCause();
        } else {
            throw new IOException(e);
        }
    } catch (InterruptedException e) {
        throw new IOException(e);
    } catch (TimeoutException e) {
        integer.cancel(true);
        throw new SocketTimeoutException();
    } finally {
        // Blocking read so need to release here since there will
        // not be a callback to a completion handler.
        readPending.release();
    }
} else {
    startInline();
    getSocket().read(to, toTimeout(getReadTimeout()), TimeUnit.MILLISECONDS, to,
            readCompletionHandler);
    endInline();
    if (readPending.availablePermits() == 1) {
        nRead = to.position();
    }
}

客户端的 Socket read timeout

BIO

HttpUrlConnection

HttpUrlConnection 就是使用的 BIO, 它自己是使用native 代码实现的 timeout.

NIO

例子的改正

这样即实现了最长等多少秒, 又不忘在后来的response 时候消费掉 entity, 保证连接释放.

public String asyncQuery() throws UnsupportedEncodingException, ExecutionException, InterruptedException, TimeoutException {
        WebTarget target = logsTarget.path("/").queryParam("style", "increase");
        System.out.println("url " + target.getUri().toString());
        CompletableFuture<String> result = new CompletableFuture<>();

        target.request(MediaType.APPLICATION_JSON).async().get(new InvocationCallback<String>() {

            @Override
            public void completed(String response) {
                System.out.println("I get response: " + response);
                result.complete(response);
            }

            @Override
            public void failed(Throwable throwable) {
                System.out.println("faied with URL " + target.getUri().toString() + " " + throwable.getMessage());
                result.completeExceptionally(throwable);
            }
        });

        return result.get(10, TimeUnit.SECONDS);
    }