背景

我们在使用 GPT 或者 套壳 GPT 时,会发现对话的文字是一段一段展示出来的,而不是一次性,观察接口的 Content-Type可以看到,使用的是 text/event-stream;charset=UTF-8, 也就是 Server-Sent Events 服务端推送 (下文简称 SSE), 它和 WebSocket 不同,是单向的服务器推送,类似 HTTP 2Server Push, 这篇文章就让我们来简单了解下 SSE.

图 4

什么是 Server-Sent Events

Server-Sent Events (SSE) 本质上就是一个 HTTP 接口,区别于常见的 json 消息体,Content-Type: application/json; charset=utf-8, 它的类型为 Content-Type: text/event-stream;charset=UTF-8, 在调试窗口的 EventStream中可以看到:

标准报文体 如下:

1
2
3
4
id: xxx
event: xxx
retry: xxx
data: xxx

图 5

客户端如何发起 SSE 请求

既然也是 HTTP接口,那么如何发起请求呢?不同于 XMLHttpRequest 或者 fetch, 浏览器提供 [EventSource](https://developer.mozilla.org/en-US/docs/Web/API/EventSource) API 进行 SSE 的请求

MDN 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
const sse = new EventSource("/api/v1/sse");

/*
* This will listen only for events
* similar to the following:
*
* event: notice
* data: useful data
* id: someid
*/
sse.addEventListener("notice", (e) => {
console.log(e.data);
});

/*
* Similarly, this will listen for events
* with the field `event: update`
*/
sse.addEventListener("update", (e) => {
console.log(e.data);
});

/*
* The event "message" is a special case, as it
* will capture events without an event field
* as well as events that have the specific type
* `event: message` It will not trigger on any
* other event type.
*/
sse.addEventListener("message", (e) => {
console.log(e.data);
});

服务器如何接受 SSE 请求

客户端发起 SSE请求后,各种套壳 GPT 应用肯定要 调用 GPT 3.5 API (访问自研大模型进行取数) , 这里以 Node.js为例

图 6

首先使用 express搭一个服务

1
2
3
4
5
6
7
8
const express = require('express');

const app = express();

app.use(express.static(__dirname));
app.listen(3000, () => {
console.log('server open in 3000');
});

然后声明一个 /sse GET 请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
const { Transform } = require('node:stream');

app.get('/sse', (req, res) => {
// SseStream 继承 Transform, 下文会提到
const sse = new SseStream();

sse.pipe(res);

const message = {
data: `hello\nworld${Date.now()}`,
event: 'push',
id: Date.now(),
retry: '',
comment: '测试',
};

sse.write(message);

});

数据流

当接收到请求后,使用 Node.js[Transform](https://nodejs.org/api/stream.html#class-streamwritable:~:text=event%20is%20emitted.-,Class%3A%20stream.Transform,-%23) 流,对数据进行转换

在 Node.js 中,标准的输入输出流分别为:

  • 输入流
  • 输出流
  • 错误流

而流又分为:

  • 可读流 (Readable)
  • 可写流 (Writable)
  • 双工流 (Duplex)
  • 转换流 (Transform)

其中双工流和转换流都是可读可写的。

输入流 (Readable)

1
process.stdin

输出流 (Writable)

1
process.stdout

错误流 (Writable)

1
process.stderr

写入请求头

首先写入 text/event-streamContent-Type

1
2
3
4
5
6
res.writeHead(200, {
'Content-Type': 'text/event-stream; charset=utf-8',
'Transfer-Encoding': 'identity',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
});

使用转换流将内容转成 SSE 的消息体格式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
const { Transform } = require('node:stream');

class SseStream extends Transform {
_transform(message, encoding, callback) {
if (message.comment) {
this.push(`: ${message.comment}\n`);
}

if (message.event) {
this.push(`event: ${message.event}\n`);
}

if (message.id) {
this.push(`id: ${message.id}\n`);
}

if (message.retry) {
this.push(`retry: ${message.retry}\n`);
}

if (message.data) {
this.push(message.data);
}

this.push('\n');
callback();
}
}

完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
const { Transform } = require('node:stream');

class SseStream extends Transform {
constructor(req) {
super({ objectMode: true });

if (req) {
req.socket.setKeepAlive(true);
req.socket.setNoDelay(true);
req.socket.setTimeout(0);
}
}

pipe(res, options) {
if (res.writeHead) {
res.writeHead(200, {
'Content-Type': 'text/event-stream; charset=utf-8',
'Transfer-Encoding': 'identity',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
});
res.flushHeaders();
}

res.write(':ok\n\n');

return super.pipe(res, options);
}

_transform(message, encoding, callback) {
if (message.comment) {
this.push(`: ${message.comment}\n`);
}

if (message.event) {
this.push(`event: ${message.event}\n`);
}

if (message.id) {
this.push(`id: ${message.id}\n`);
}

if (message.retry) {
this.push(`retry: ${message.retry}\n`);
}

if (message.data) {
this.push(message.data);
}

this.push('\n');
callback();
}
}

上诉的步骤相对来说比较繁琐,可以也可以使用 ssestream 这个库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
const SseStream = require('ssestream').default;

app.get('/sse-stream', (req, res) => {
const sse = new SseStream(req);

sse.pipe(res);

const message = {
data: `hello\nworld${Date.now()}`,
event: 'push',
id: Date.now(),
retry: '',
comment: '测试',
};

// 模拟服务器推送
setInterval(() => {
sse.write(message);
}, 1000);
});

效果如图,可以看到每秒钟会收到服务器推送的消息,查看 Demo 完整代码

Kapture 2023-12-21 at 15.03.47.gif

参考链接

Node.js Stream
Server-Sent Events 服务器信息推送
Web/API/EventSource