python flask 实现 SSE
这是最近2年第二次需要 server-sent events(SSE). 需求都是一样, 某个页面的需要后台去处理一个很长的请求, 后台处理至少要30秒钟. 这30多秒可以让用户去等, 但是有点长. 所以, 可以通过不断的把处理的进度和处理的中间结果尽快的展示到页面上显得尤为重要. 因此可以通过SSE把服务端到最新更新及时发送到页面, 让用户等等不是那么烦躁.
什么是 server-sent events
server-sent events 主要用来解决服务端要实时发送数据到客户端到情况. 比如最新的股票实时行情, 网页游戏服务端的实时数据, 视频/音频信息流等数据. 服务端随时可以推送数据到客户端, 客户端接收并处理. 但是客户端在第一次发送请求到服务端之后, 不会再发送新请求到服务端, 只能被动接收服务端到推送数据. 但是客户端可以选择随时关闭连接, 不再接收新数据. 这也是SSE 区分于 WebSockets 的一个重大区别.
server-sent events 客户端的接口
完全的文档可以看这里(https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events), 讲的很清楚.
客户端发起连接请求
const evtSource = new EventSource("//api.example.com/sse", {
withCredentials: true,
});
客户端接收到匿名event
客户端处理:
evtSource.onmessage = (event) => {
console.log(event);
};
这个时候, 服务端发送的数据是:
data: some text
# 或者是:
data: {"name": "eric", "age": 55}
客户端接收到命名event
客户端处理:
evtSource.addEventListener("myEventName", (event) => {
console.log(event);
});
这个时候, 服务端发送的数据是:
event: myEventName
data: {"name": "eric", "age": 55}
客户端收到的注释(comment)
服务器可以发送以 :
开头的数据, 这时候客户端会认为这是没有任何意义的数据, 只是注释, 这通常只是用来保持这个连接, 不让它断掉. 比如
: this is a comment
服务端和客户端通信的协议
通过上面的例子可以看到, 2端其实都是纯文本行的数据发送与接收. 每行都是以 key: value 的形式表示的, 除了注释行, 它没有key, 是以 :
开头的一行.
允许的key 只有以下几种可能:
event
: event 的名字, 没有名字就默认是匿名事件.data
: event的内容, 可以连续2行或多行都是 event: value 的形式, 这时候客户端浏览器会自动拼接2行或多行成一行, 拼接处加上换行符.id
: event 的id, 可以没有.retry
: 服务端发送给客户端, 当它侦测到断连之后, 多久才能发起重连断时间毫秒数.
所有不符合 key
: value (这里的key是上面的4种) 和 : some comment
的形式, 都被认为是无效的.
客户端关闭连接
客户端通过下面的形式关闭连接:
evtSource.close();
客户端发现错误
客户端通过下面的代码做出错处理:
evtSource.onerror = (err) => {
console.error("EventSource failed:", err);
};
关于客户端 EventSource 的所有API: https://developer.mozilla.org/en-US/docs/Web/API/EventSource
服务端的代码
其实客户端的代码相对来说比较统一, 浏览器都是统一规范. 服务端有不同的服务器语言, 实现起来却可能有差异. 比如 Java 的 JAX-RS 里面就有专门处理 SSE 的API.
今天我们就看一下 Python 里面是如何实现的.
python Flask 的实现
对于大家经常用到的 Flask, 如果你搜索 Flask SSE, 结果第一的是 https://flask-sse.readthedocs.io/en/latest/quickstart.html, 但是你点进去看, 发现它竟然使用 redis 去实现 SSE, 这相当于我又要安装一个 redis server. 这可都重的.
其实不需要这么实现, 也能达到 SSE 的效果. 服务端的代码如下:
import time
from flask import Flask, stream_with_context
@app.route('/sse')
def handle_sse_stream():
def generate_event():
while True:
yield f'data: {"time": time.time()}\n\n'
sleep(1000)
return Response(stream_with_context(generate_event()), mimetype='text/event-stream')
上面的函数里面通过一个生成器, 不断的生成新的事件, 然后发送给客户端.
改进
通常我们的代码不会仅仅在那里sleep, 然后发送一个时间, 但是它可以每隔一段时间查询数据库去看最新的状态, 然后发送最新的状态给客户端. 比如上面的 yield 行的代码可以改成:
yield f'data: {"status": get_db_status()}\n\n'
进一步改进
上面的代码其实要求有一个数据库或者共享的组件来协调, 如果在同一个服务器上, 可以通过 queue.queue 这个队列来组成生产者/消费者 消息队列来传递消息.
上面的代码可以改成如下的方式:
def generate_event(queue):
while True:
yield f'data: {"msg": queue.get()}\n\n'
sleep(1000)
这就解决了需要一个中间件来协调的问题.
总结
对于基于 Python flask 的 SSE, 其实可以通过 flask的 stream_with_context()
和 生成器来实现, 再通过 queue.queue 就实现了服务器内部的异步通信.