Java socket timeout 在 Linux 的实现
Block IO (SocketInputStream)
我们从 java.net.SocketInputStream
看起. 它的 read
方法传入 timeout
的值. 如果不传入 timeout
值, 则这个 timeout
的值是通过 setOption
设置的, 若没有设置过, 那么它的值就是 0.
也就是说每次 read
方法被调用, 都针对这次 read
操作有一个重新计时的 timeout
时钟. 如果一个网络数据要读好几次, 每次 read
都会重新计时这个 timeout
.
那么这个 timeout
最终在哪里使用的呢? 下面是每一步调用的栈:
java.net.SocketInputStream.read(byte b[], int off, int length, int timeout)
java.net.SocketInputStream.socketRead(FileDescriptor fd, byte b[], int off, int length, int timeout)
native java.net.SocketInputStream.socketRead0(FileDescriptor fd, byte b[], int off, int length, int timeout)
- SocketInputStream.c
Java_java_net_SocketInputStream_socketRead0(JNIEnv *env, jobject this, jobject fdObj, jbyteArray data, jint off, jint len, jint timeout)
点过上面链接, 仔细查看路径会发现, 这个SocketInputStream.c
既不是 shared 代码, 也不是 Linux 的代码, 而是 Solaris, 而我们讨论的是 Linux 的代码, 为什么给出的是 Solaris 代码呢? 这个问答给出了明确的答案: SocketInputStream.c 包含了 net_util.h, 而 net_util.h 又包含了 net_util_md.h. 而 net_util_md.h 里面使用#if defined(__linux__) || defined(MACOSX)
来区分不同的 OS. 上面的
SocketInputStream.c
又把timeout
参数传给了NET_Timeout
函数.NET_Timeout
在 linux_close.c (注意还是 Solaris 目录)里面定义:/* * Wrapper for poll(s, timeout). * Auto restarts with adjusted timeout if interrupted by * signal other than our wakeup signal. */ int NET_Timeout(int s, long timeout) {
上面的注释中很好的说明了 它只是用来封装 poll 系统调用, 具体的调用就是:
在一个无限循环中, 每次用剩余的 timeout 时间去看看有没有可读数据( poll()), 直到读到, 若没读到, 则计算剩余时间, 继续用剩余时间去读, 直到读到, 或者 timeout 没有剩余.if (timeout > 0) { gettimeofday(&t, NULL); prevtime = t.tv_sec * 1000 + t.tv_usec / 1000; } for(;;) { struct pollfd pfd; int rv; threadEntry_t self; /* * Poll the fd. If interrupted by our wakeup signal * errno will be set to EBADF. */ pfd.fd = s; pfd.events = POLLIN | POLLERR; startOp(fdEntry, &self); rv = poll(&pfd, 1, timeout); endOp(fdEntry, &self); /* * If interrupted then adjust timeout. If timeout * has expired return 0 (indicating timeout expired). */ if (rv < 0 && errno == EINTR) { if (timeout > 0) { gettimeofday(&t, NULL); newtime = t.tv_sec * 1000 + t.tv_usec / 1000; timeout -= newtime - prevtime; if (timeout <= 0) { return 0; } prevtime = newtime; } } else { return rv; } }
NIO (SocketChannel)
Java New IO 可以选择是否是 block 模式.
socketChannel.configureBlocking(false);
这主要影响到这 3 个 API 的操作- connect();
- read();
write();
注意到, 他们和之前 Blocking IO 的区别: 他们都没有地方设置 timeout 的值. connect(): 若同步, 则等到建立连接或者 IO error 产生; 若异步, 则立马返回, 后续通过一直调用 finishConnect() 来测试是不是已经建立连接; read(): 若同步, 直到读到数据或 IO 出错; 若异步, 读一次, 立马返回, 不管是不是读到数据, 通过返回值判断读多少; write(): 若同步, 直到写完或者 IO 出错; 若异步, 写一次, 立马返回, 不管是不是有数据写出, 通过返回值判断写多少;
sun.nio.ch.SocketChannelImpl
里面的read()实现(写类似):
if (blocking) {
do {
n = IOUtil.read(fd, buf, -1, nd);
} while (n == IOStatus.INTERRUPTED && isOpen());
} else {
n = IOUtil.read(fd, buf, -1, nd);
}
即便使用 non-blocking 的方式, 为了保证连接建立好, 读完整, 写完数据, 就要业务线程使用 loop 的方式一直检查这些连接操作,读操作, 写操作是不是好了. 这样并不能带来更好的性能, 相反, 需要更多的 CPU 时间.
所以, 为了更好的性能, 就单独找个线程, 来负责所有这些 IO 操作的连接, 读, 写操作. 其它线程就去做其它事情去了, 这个负责 IO 操作的线程为了更好的同时处理多个 IO, 就使用 Selector 的方式, 相当于一个人看守多个流水线.
到此, 我们还没有涉及 timeout, 并且上面 API 中也根本没有提到 timeout, 那么 non-blocking 的方式中, timeout 是怎么实现的呢? 通常都是负责 IO 的线程来实现的, 它每次批量 select() 之后, 就会查看一遍是不是已经有超时的 IO. 比如 JDK 自带的 jdk.internal.net.http.HttpClientImpl
中的 purgeTimeoutsAndReturnNextDeadline
方法, 就负责每次 select() 完查看是否有 timeout 的 IO.
https://github.com/AdoptOpenJDK/openjdk-jdk11/blob/master/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java#L1210
Blocking IO 里面的 timeout 每次调用 read/write API 的时候, 直接传到 epoll() 方法去的, 也就是说如果一个 response 调用了 read() 多次, 那么每次 read() 都是重新计时 timeout 的值. 但是在 NIO 里面, 我们看到 JDK 提供的这个HttpClientImpl 实现里面, 使用 NIO 的方式, 它每次调用 read() 方法是不会 reset timeout 值的, 这个 timeout 值的使用方式是: 从一开始便扣减里面剩余的时间. 这 2 种方式对于读一个很大数据流是有影响的.
另外这篇文章详细介绍了各种 http client 是否支持 sync, async, 是使用 future 的方式, 还是使用 callback 的方式:
https://www.mocklab.io/blog/which-java-http-client-should-i-use-in-2020/
可以调节下面代码的 connect timeout 和 read timeout 值, 来观察或者 debug 是如何实现的 timeout:
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpClient.Version;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.time.Duration;
public class NioTest {
public static void main(String[] args) throws IOException, InterruptedException {
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create("https://www.tianxiaohui.com/"))
.timeout(Duration.ofMillis(1000))
.header("Content-Type", "application/json")
.GET()
.build();
HttpClient client = HttpClient.newBuilder()
.version(Version.HTTP_1_1)
.connectTimeout(Duration.ofMillis(1000))
.build();
client.sendAsync(request, BodyHandlers.ofString())
.thenApply(HttpResponse::body)
.thenAccept(System.out::println)
.exceptionally(t -> {System.out.println(t.getMessage());t.printStackTrace(); return null;});
try {
Thread.sleep(600000);
System.out.println();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}