Eric 发布的文章

Java 里面的 PrivilegedAction 和 PrivilegedExceptionAction

读 JDK 里面的代码, 可能会遇到某些操作需要被封装成 PrivilegedAction 和 PrivilegedExceptionAction 来执行, 比如下面的代码块:

AccessControlContext acc = (System.getSecurityManager() != null)
                ? AccessController.getContext()
                : null;
PrivilegedAction<Boolean> action = new PrivilegedAction<>() {
    public Boolean run() { return findSomething(); }
};
AccessController.doPrivileged(action, acc);

为什么需要特权操作(PrivilegedAction)?

Java 诞生的初期在浏览器的环境执行(applet), 所以要加很多安全限制, 从 1.0 版本就有了 SecurityManager 的概念, 从最核心的 System 类的 System.getSecurityManager() 你就能得到系统的安全管理器.

只不过, 这个安全管理器默认是没有开启的. 并且从 JDK 17 开始将要被废弃.

安全管理器是如何工作的?

安全管理器(SecurityManager)是通过policy来限制你能不能做某个操作. 比如: 代码能不能访问网络, 能不能读取磁盘文件, 能不能访问环境变量等. Policy 存放在一些文件里面, 通过改变文件里面policy的内容, 设置安全管理器是允许还是拒绝某些从左.

一个 policy 的例子如下: 下面的 policy 表示在运行时对于代码模块 java.scripting 中的代码, 授予所有的权限.

grant codeBase "jrt:/java.scripting" {
    permission java.security.AllPermission;
};

你能在 <java.home>/lib/security/default.policy 里面找到系统默认的 policy. 当然你也可以定义自己的policy, 放到 ${java.home}/conf/security/java.policy 里面. 或者放到其它地方, 并且在启动参数里面通过: –Djava.security.policy=/tmp/myPolicy.policy 指定.

PrivilegedAction 和 PrivilegedExceptionAction 是如何工作的?

PrivilegedAction 和 PrivilegedExceptionAction 都会封装一个操作, 当这个操作会抛出 checked exception 的时候, 就需要用 PrivilegedExceptionAction, 否则就用 PrivilegedAction.

封装完这操作, 就使用 AccessController.doPrivileged(action, acc) 去执行, 它会检查是不是有执行权限, 如果有, 就去执行, 否则不执行.

代码示例

下面的代码先设置一个安全管理器, 然后尝试访问一个URL, 这个时候, 就会报错.

import java.io.IOException;
import java.net.URL;

public class Main {

    public static void main(String[] args) {
        System.setSecurityManager(new SecurityManager());
        try {
            new URL("https://www.tianxiaohui.com").openConnection().connect();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

错误: 拒绝访问:

Exception in thread "main" java.security.AccessControlException: access denied ("java.net.SocketPermission" "www.tianxiaohui.com:443" "connect,resolve")
    at java.base/java.security.AccessControlContext.checkPermission(AccessControlContext.java:485)
    at java.base/java.security.AccessController.checkPermission(AccessController.java:1068)
    at java.base/java.lang.SecurityManager.checkPermission(SecurityManager.java:416)
    at java.base/java.lang.SecurityManager.checkConnect(SecurityManager.java:919)
    at java.base/sun.net.www.http.HttpClient.openServer(HttpClient.java:620)
    at java.base/sun.net.www.protocol.https.HttpsClient.<init>(HttpsClient.java:266)
    at java.base/sun.net.www.protocol.https.HttpsClient.New(HttpsClient.java:380)
    at java.base/sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.getNewHttpClient(AbstractDelegateHttpsURLConnection.java:193)
    at java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1245)
    at java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1131)
    at java.base/sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:179)
    at java.base/sun.net.www.protocol.https.HttpsURLConnectionImpl.connect(HttpsURLConnectionImpl.java:142)

如果不设置安全管理器, 就能正常运行. 那为什么系统的默认安全管理器是能正常运行的呢? 因为在 System 类里面, 我们可以看到它的默认安全管理器是设置了允许所有的执行:

// s is the default SecurityManager
s.getClass().getProtectionDomain().implies(SecurityConstants.ALL_PERMISSION);

由 ServiceLoader 引发的CPU 100%

最近遇到2次由于 ServiceLoader 引起的 CPU 100%, 导致业务线程不能正常运行.

什么是 Service Loader

Spring 里面有个核心的概念, 就是依赖注入: 我期望有个服务, 但是一开始我并不指定具体的实现类, 等到我真正需要的时候, 这个依赖根据运行时自动注入. 同样, JDK 6 也引入了一个一样的实现框架, 就是 ServiceLoader. 它的实现也很简单. 使用的方法如下:

ServiceLoader<ServiceAPI> serviceLoader =ServiceLoader.load(ServiceAPI.class);
Iterator<ServiceAPI> iterator = serviceLoader.iterator();
while (iterator.hasNext()) {
    ServiceAPI impl = iterator.next();
}

它的主要作用就是: 你需要那个服务的具体实现, 让我来帮你找, 可能找到一个或多个, 或找不到. 结果返回的是一个 Iterator.

如何找到具体的实现的?

如果某个 Jar 包提供某个服务的具体实现, 按照 JDK 定义的规则, 它就会在在 Jar 包的 META-INFO/services 文件夹提供一个名为某个service的文件, 文件的内容就是具体的实现类.
比如 xerceslmpl-x.x.x.jar 提供了 javax.xml.datatype.DatatypeFactory 的具体实现:
xerceslmpl.png

文件的内容就是本 jar 包里面的具体实现类的全名.
所以, 可以通过判断当前 jar 包里面的 META-INFO 文件夹下面是不是包含某个service 文件名来判断是不是有这个实现.

如何出问题的?

出问题的就是下面这行代码:

javax.xml.datatype.DatatypeFactory df = javax.xml.datatype.DatatypeFactory.newInstance();

就是要初始化一个xml 转换成 Java对象的类型工厂类, 如果去 JDK 里面查看这个类的源代码, 会发现其实它是一个抽象 Service. 运行时它有4种查找具体实现类的方法. 前2种都是通过配置, 第三种就是通过 ServiceLoader 去查找它的具体实现.

出问题的方式就是通过 ServiceLoader 的方式, 这种方式就是通过 ClassLoader 去查找所有的 Jar 包, 一个个去看有没有某个 jar 的 META-INFO/services 文件夹下面包含这么一个 service 的具体实现.

通常的实现的一个具体栈:

java.lang.Thread.State: RUNNABLE
    at java.util.zip.ZipCoder.getBytes(ZipCoder.java:77)
    at java.util.zip.ZipFile.getEntry(ZipFile.java:325)
    - locked <0x00000007157ac988> (a java.util.jar.JarFile)
    at java.util.jar.JarFile.getEntry(JarFile.java:253)
    at java.util.jar.JarFile.getJarEntry(JarFile.java:236)
    at sun.misc.URLClassPath$JarLoader.getResource(URLClassPath.java:1084)
    at sun.misc.URLClassPath$JarLoader.findResource(URLClassPath.java:1062)
    at sun.misc.URLClassPath$1.next(URLClassPath.java:281)
    at sun.misc.URLClassPath$1.hasMoreElements(URLClassPath.java:291)
    at java.net.URLClassLoader$3$1.run(URLClassLoader.java:609)
    at java.net.URLClassLoader$3$1.run(URLClassLoader.java:607)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader$3.next(URLClassLoader.java:606)
    at java.net.URLClassLoader$3.hasMoreElements(URLClassLoader.java:631)
    at sun.misc.CompoundEnumeration.next(CompoundEnumeration.java:45)
    at sun.misc.CompoundEnumeration.hasMoreElements(CompoundEnumeration.java:54)
    at java.util.ServiceLoader$LazyIterator.hasNextService(ServiceLoader.java:354)
    at java.util.ServiceLoader$LazyIterator.hasNext(ServiceLoader.java:393)
    at java.util.ServiceLoader$1.hasNext(ServiceLoader.java:474)
    at javax.xml.datatype.FactoryFinder$1.run(FactoryFinder.java:296)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.xml.datatype.FactoryFinder.findServiceProvider(FactoryFinder.java:292)
    at javax.xml.datatype.FactoryFinder.find(FactoryFinder.java:268)
    at javax.xml.datatype.DatatypeFactory.newInstance(DatatypeFactory.java:144)

通过上面的栈, 我们可以看到, 它其实是到 jar 到文件里面去看有没有这个项目, 没有就继续查找下一个.
这种方式相对消耗CPU到, 因为每次都要查找所有的jar 包, 一个个去查看压缩jar里面有没有这个文件. 如果以线上项目有2百多个jar, 查找一次要消耗即使毫秒.

但是, 即便这样, 还打不到让CPU很高的程度.

如何推高 CPU 的?

如果大家查看上面的线程栈, 其实在遍历某个jar 之前, 外层的遍历其实是遍历一些 ClassLoader, 然后每个 ClassLoader 都会有一些 Jar, 然后再遍历这些 jar.
其实真正出问题的是在 TomcatEmbeddedWebappClassLoader 里面. 这个 ClassLoader 在遍历每个Jar 的时候, 如果没有对应的 service 具体实现的 META-INFO/services 文件, 它会抛出一个 FileNotFoundException, 既然有 Exception, 就会有回溯栈, 就会非常耗时, 甚至进入C 代码. 看下面的火焰图:
flame.png

JVM 安全点 Safepoint

最近在看 ZGC 的某些具体的实现, 有篇文章对了从 Serial GC, 到 parallel GC, 再到CMS, 然后到G1, 最后到如今的ZGC, 其中一个重要的差别就是把很多GC 时间(STOP the world)要做的事情移到并发去做的过程. 其实这是一个从简单到复杂的过程, 也是一个从粗放到逐步精细控制的过程. 最终的结果就是在GC的时间点上, 做的事情越来越少.

如果讨论到GC 的时间点, 其中一个重要的事情, 就是安全点(Safepoint), 它是一个让所有业务线程在某个点全部停下来的过程, 由于有很多业务线程, 让它们同时停下来, 就涉及到一个协调机制, 如何让这些线程在不影响业务线程的情况下, 以最快的速度停下来, 就显得非常重要.

什么是JVM Safepoint

JVM(Java虚拟机)中的Safepoint是一种机制,用于确保所有线程在执行某些特定的系统级操作之前达到一个已知且一致的状态。这些系统级操作通常包括垃圾收集(GC)、线程栈的展开、代码重优化以及一些运行时系统的更新等。

在Safepoint期间,JVM会暂停所有的Java线程执行(也就是所谓的“Stop-The-World”暂停),直到所有线程都到达Safepoint。这样可以确保在进行这些操作时,不会有任何线程在执行Java字节码,从而避免了潜在的数据不一致和竞争条件。

JVM 可以在那些代码区域达到安全点?

  1. 方法调用边界:当一个方法被调用时,可能会在调用前后插入Safepoint检查。这是因为方法调用是程序执行中的自然中断点,且通常是执行时间较长的操作。
  2. 循环回边:在循环结构中,循环的末尾(即循环要重新开始的地方)是达到Safepoint的一个常见位置。这样做是为了防止长时间运行的循环阻止系统达到Safepoint。
  3. 显式的Safepoint检查点:JVM的即时编译器(JIT)可能会在生成的机器代码中的特定位置插入显式的Safepoint检查。这些检查通常会在执行时间较长的代码段中进行。
  4. 同步操作:当线程尝试进入或退出同步块(synchronized block)或方法时,也可能会进行Safepoint检查,因为这些操作涉及到锁的获取和释放。
  5. 异常抛出点:当程序抛出异常时,可能会在异常处理之前达到Safepoint,因为异常处理涉及到栈的展开和控制流的改变。
  6. 线程状态变化:当线程状态发生变化时(例如,从运行状态转为等待或休眠状态),也可能会进行Safepoint检查。

其他:JVM实现还可能在其他不那么明显的地方插入Safepoint检查,这些通常是由于特定的实现细节和优化策略。

其它

  1. Safepoint 在 Java 语言规范里没有涉及, 但是每个 JVM 实现都有 Safepoint;
  2. 什么时候需要安全点 Safepoint?

    1. GC 某些阶段的时候;
    2. JVM TI 捕获 stacktrace 的时候;
    3. 类重新定义的时候(Class redefinition), 比如 BCI 代码 Instrument 的时候;
    4. 捕获 heap dump 的时候;
    5. 锁膨胀的时候 (monitor deflation);
    6. 锁从偏向锁取消的时候(Lock unbiasing);
    7. 方法逆优化的时候(Method deoptimization);
    8. 其它...
  3. 对于 Zing JVM 实现, 分为全局安全点( global Safepoint) 和 线程安全点 (Thread Safepoint), 对于 Hotspot (Oracle/OpenJDK)系列只有全局安全点;
  4. 所有的 JVM 实现都在某些地方需要全局安全点( global Safepoint);

参考:

http://psy-lob-saw.blogspot.com/2016/02/why-most-sampling-java-profilers-are.html
http://psy-lob-saw.blogspot.com/2015/12/safepoints.html
https://psy-lob-saw.blogspot.com/2014/03/where-is-my-safepoint.html

google search: with-gc-solved-what-else-makes-jvm-pause

使用CSS伪元素和动画实现光标闪烁效果

当用户在 chatGPT 输入问题之后, 并且chatGPT 尚没有返回的时候, 我们会看到有个黑色的圆点在闪烁. 如何实现这么一个闪烁的圆点?

以下是现实代码:

.toggle-text::after {
    font-size: 12px;
    animation: toggle 1s infinite;
    content: '⚫';
  }

  @keyframes toggle {
    0%, 50% {
      opacity: 0;
    }
    50.01%, 100% {
      opacity: 1;
    }
  }

<div class='toggle-text'>text</div>

这里面用到的技术

  1. CSS 伪元素: 这里的 ::after 就是CSS 伪元素, 它们都不是真实存在的元素. 它不属于对应的html 元素, 而是在它之后, 当然还有很多其它伪元素.
  2. CSS 动画: 这里通过 animation 来实现动画. 动画的帧通过配合 @keyframes 来完成.
  3. @keyframes 通过 opacity 来实现透明度, 造成闪烁的效果.
  4. 这里插入的是 ⚫, 其实它是一个 unicode 字符, 还有更小的点, 也可以给他改变font 大小, 颜色等. 还有其它各种 unicode 表示的点. 看这里: https://www.unicodepedia.com/groups/geometric-shapes/

python flask 实现 SSE

这是最近2年第二次需要 server-sent events(SSE). 需求都是一样, 某个页面的需要后台去处理一个很长的请求, 后台处理至少要30秒钟. 这30多秒可以让用户去等, 但是有点长. 所以, 可以通过不断的把处理的进度和处理的中间结果尽快的展示到页面上显得尤为重要. 因此可以通过SSE把服务端到最新更新及时发送到页面, 让用户等等不是那么烦躁.

什么是 server-sent events

server-sent events 主要用来解决服务端要实时发送数据到客户端到情况. 比如最新的股票实时行情, 网页游戏服务端的实时数据, 视频/音频信息流等数据. 服务端随时可以推送数据到客户端, 客户端接收并处理. 但是客户端在第一次发送请求到服务端之后, 不会再发送新请求到服务端, 只能被动接收服务端到推送数据. 但是客户端可以选择随时关闭连接, 不再接收新数据. 这也是SSE 区分于 WebSockets 的一个重大区别.

server-sent events 客户端的接口

完全的文档可以看这里(https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events), 讲的很清楚.

客户端发起连接请求

const evtSource = new EventSource("//api.example.com/sse", {
  withCredentials: true,
});

客户端接收到匿名event

客户端处理:

evtSource.onmessage = (event) => {
  console.log(event);
};

这个时候, 服务端发送的数据是:

data: some text

# 或者是:
data: {"name": "eric", "age": 55}

客户端接收到命名event

客户端处理:

evtSource.addEventListener("myEventName", (event) => {
  console.log(event);
});

这个时候, 服务端发送的数据是:

event: myEventName
data: {"name": "eric", "age": 55}

客户端收到的注释(comment)

服务器可以发送以 : 开头的数据, 这时候客户端会认为这是没有任何意义的数据, 只是注释, 这通常只是用来保持这个连接, 不让它断掉. 比如

: this is a comment

服务端和客户端通信的协议

通过上面的例子可以看到, 2端其实都是纯文本行的数据发送与接收. 每行都是以 key: value 的形式表示的, 除了注释行, 它没有key, 是以 : 开头的一行.

允许的key 只有以下几种可能:

  1. event: event 的名字, 没有名字就默认是匿名事件.
  2. data: event的内容, 可以连续2行或多行都是 event: value 的形式, 这时候客户端浏览器会自动拼接2行或多行成一行, 拼接处加上换行符.
  3. id: event 的id, 可以没有.
  4. retry: 服务端发送给客户端, 当它侦测到断连之后, 多久才能发起重连断时间毫秒数.

所有不符合 key: value (这里的key是上面的4种) 和 : some comment 的形式, 都被认为是无效的.

客户端关闭连接

客户端通过下面的形式关闭连接:

evtSource.close();

客户端发现错误

客户端通过下面的代码做出错处理:

evtSource.onerror = (err) => {
  console.error("EventSource failed:", err);
};

关于客户端 EventSource 的所有API: https://developer.mozilla.org/en-US/docs/Web/API/EventSource

服务端的代码

其实客户端的代码相对来说比较统一, 浏览器都是统一规范. 服务端有不同的服务器语言, 实现起来却可能有差异. 比如 Java 的 JAX-RS 里面就有专门处理 SSE 的API.

今天我们就看一下 Python 里面是如何实现的.

python Flask 的实现

对于大家经常用到的 Flask, 如果你搜索 Flask SSE, 结果第一的是 https://flask-sse.readthedocs.io/en/latest/quickstart.html, 但是你点进去看, 发现它竟然使用 redis 去实现 SSE, 这相当于我又要安装一个 redis server. 这可都重的.

其实不需要这么实现, 也能达到 SSE 的效果. 服务端的代码如下:

import time
from flask import Flask, stream_with_context

@app.route('/sse')
def handle_sse_stream():
    def generate_event():
        while True:
           yield f'data: {"time": time.time()}\n\n'
           sleep(1000)
        
    return Response(stream_with_context(generate_event()), mimetype='text/event-stream')

上面的函数里面通过一个生成器, 不断的生成新的事件, 然后发送给客户端.

改进

通常我们的代码不会仅仅在那里sleep, 然后发送一个时间, 但是它可以每隔一段时间查询数据库去看最新的状态, 然后发送最新的状态给客户端. 比如上面的 yield 行的代码可以改成:

yield f'data: {"status": get_db_status()}\n\n'

进一步改进

上面的代码其实要求有一个数据库或者共享的组件来协调, 如果在同一个服务器上, 可以通过 queue.queue 这个队列来组成生产者/消费者 消息队列来传递消息.
上面的代码可以改成如下的方式:

def generate_event(queue):
    while True:
       yield f'data: {"msg": queue.get()}\n\n'
       sleep(1000)

这就解决了需要一个中间件来协调的问题.

总结

对于基于 Python flask 的 SSE, 其实可以通过 flask的 stream_with_context() 和 生成器来实现, 再通过 queue.queue 就实现了服务器内部的异步通信.