SSE 服务端推送与 AI 流式输出

1. SSE 是什么:一句话版

SSE (Server-Sent Events):服务器单向向客户端推送事件流,走标准 HTTP,不是新协议。

和 WebSocket 的对照:

SSE WebSocket
方向 服务器 → 客户端(单向) 双向
协议 HTTP WS
基建 用现有 HTTP 服务器(Nginx 原样转发) 需要特殊配置
自动重连 浏览器原生支持 自己写
数据格式 文本(UTF-8) 文本 + 二进制
跨域 走 CORS 走 WS 握手
适合 通知流、AI 流式输出、股价推送 聊天、协同、游戏

核心区别SSE 简单——只为"服务器推数据"这一件事优化,不搞双向。in4vue 的 AI 问答助手就用它。

Java 对照


2. SSE 的协议格式

服务器响应:

Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive

data: 第一条消息

data: 第二条消息
data: 可以跨多行

event: chat
data: 带事件名的消息

id: 42
data: 带 ID 的消息(客户端重连时 Last-Event-ID 会带回这个)

: 这是注释,客户端忽略

规则


3. 最简 SSE:EventSource API

浏览器原生:

const source = new EventSource('https://api.example.com/events')

source.onmessage = (event) => {
  console.log('收到:', event.data)
}

source.onerror = (err) => {
  console.error('错误:', err)
}

// 自定义事件类型
source.addEventListener('chat', (event) => {
  console.log('聊天消息:', event.data)
})

// 关闭
source.close()

特点


4. EventSource 的致命缺陷:不能带 header

// ❌ EventSource 不支持自定义 header
new EventSource(url, { headers: { Authorization: `Bearer ${token}` } })
// 这个 API 不存在

影响:需要 Authorization 头的鉴权接口用不了 EventSource

三种绕过

4.1 URL 参数带 token

new EventSource(`${url}?token=${token}`)

缺点:token 进 URL 日志、浏览器历史,不安全。

4.2 Cookie

同域时浏览器自动带 httpOnly cookie,EventSource 也能用。in4vue 的 Edge Function 同域部署时可选。

4.3 用 fetch + 手动解析(最灵活)

下面详细讲。


5. Fetch-based SSE:能带 header 能 abort

async function streamSSE(url: string, token: string, onMessage: (msg: string) => void, signal?: AbortSignal) {
  const response = await fetch(url, {
    method: 'GET',
    headers: {
      'Authorization': `Bearer ${token}`,
      'Accept': 'text/event-stream',
    },
    signal,
  })

  if (!response.ok || !response.body) {
    throw new Error(`SSE failed: ${response.status}`)
  }

  const reader = response.body.pipeThrough(new TextDecoderStream()).getReader()
  let buffer = ''

  while (true) {
    const { value, done } = await reader.read()
    if (done) break

    buffer += value
    const lines = buffer.split('\n')
    buffer = lines.pop() || '' // 最后一段可能没收全,留着下一轮拼

    let event = 'message'
    let data = ''

    for (const line of lines) {
      if (line.startsWith('event:')) {
        event = line.slice(6).trim()
      } else if (line.startsWith('data:')) {
        data += line.slice(5).trim()
      } else if (line === '') {
        // 空行 = 一条事件结束
        if (data) onMessage(data)
        event = 'message'
        data = ''
      }
    }
  }
}

关键点

成熟的库@microsoft/fetch-event-source 封装了上面这套。推荐用库,不自己写。


6. in4vue 的 AI 流式输出

最常见的 SSE 用途:AI 接口流式返回字符。用户问一个问题,AI 的回答边生成边显示(像打字机),体验比"等 10 秒出一整段"好 10 倍。

6.1 后端(Edge Function)

in4vue 的 AI 代理在 edge/ 目录(Cloudflare Pages Function)。伪代码:

// edge/functions/ai/chat.ts
export async function onRequest(context: any) {
  const { question } = await context.request.json()
  const openaiResp = await fetch('https://api.openai.com/v1/chat/completions', {
    method: 'POST',
    headers: {
      'Authorization': `Bearer ${context.env.OPENAI_KEY}`,
      'Content-Type': 'application/json',
    },
    body: JSON.stringify({
      model: 'gpt-4o-mini',
      messages: [{ role: 'user', content: question }],
      stream: true,
    }),
  })

  // 直接把 OpenAI 的 SSE 流透传给前端
  return new Response(openaiResp.body, {
    headers: {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      'Access-Control-Allow-Origin': '*',
    },
  })
}

Edge Function 在前端和 OpenAI 之间藏 API key,同时做限流。

6.2 前端接收流

// src/api/ai.ts
export interface AIStreamOptions {
  onToken: (token: string) => void
  onDone: () => void
  onError: (err: Error) => void
  signal?: AbortSignal
}

export async function askAI(question: string, opts: AIStreamOptions) {
  try {
    const response = await fetch('/ai/chat', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({ question }),
      signal: opts.signal,
    })

    if (!response.body) throw new Error('No stream body')

    const reader = response.body.pipeThrough(new TextDecoderStream()).getReader()
    let buffer = ''

    while (true) {
      const { value, done } = await reader.read()
      if (done) break
      buffer += value

      const lines = buffer.split('\n')
      buffer = lines.pop() || ''

      for (const line of lines) {
        if (!line.startsWith('data:')) continue
        const data = line.slice(5).trim()
        if (data === '[DONE]') { // OpenAI 结束标志
          opts.onDone()
          return
        }
        try {
          const json = JSON.parse(data)
          const token = json.choices?.[0]?.delta?.content
          if (token) opts.onToken(token)
        } catch {
          // 忽略不能解析的行(心跳等)
        }
      }
    }
    opts.onDone()
  } catch (err: any) {
    if (err.name === 'AbortError') return
    opts.onError(err)
  }
}

6.3 Vue 组件里用

<script setup lang="ts">
import { ref } from 'vue'
import { askAI } from '@/api/ai'

const question = ref('')
const answer = ref('')
const loading = ref(false)
let abortController: AbortController | null = null

async function ask() {
  if (!question.value) return
  abortController?.abort()
  abortController = new AbortController()

  answer.value = ''
  loading.value = true

  await askAI(question.value, {
    onToken: (t) => { answer.value += t },
    onDone: () => { loading.value = false },
    onError: (err) => {
      loading.value = false
      answer.value = `❌ ${err.message}`
    },
    signal: abortController.signal,
  })
}

function stop() {
  abortController?.abort()
  loading.value = false
}
</script>

<template>
  <div class="space-y-4">
    <el-input v-model="question" type="textarea" :rows="3" placeholder="问我 Vue 相关问题..." />
    <div class="flex gap-2">
      <el-button type="primary" :loading="loading" @click="ask">提问</el-button>
      <el-button v-if="loading" @click="stop">停止</el-button>
    </div>
    <div v-if="answer" class="whitespace-pre-wrap rounded bg-gray-50 p-4 dark:bg-gray-800">
      {{ answer }}<span v-if="loading" class="animate-pulse">▊</span>
    </div>
  </div>
</template>

细节


7. SSE 响应实时渲染 Markdown

AI 返回的是 Markdown 格式。每收到新 token 就 re-parse 一次性能堪忧。思路:

<script setup lang="ts">
import MarkdownIt from 'markdown-it'
import { computed, ref } from 'vue'
import { refDebounced } from '@vueuse/core'

const md = new MarkdownIt()
const answer = ref('')
const debouncedAnswer = refDebounced(answer, 50) // 50ms 防抖,足够顺滑

const html = computed(() => md.render(debouncedAnswer.value))
</script>

<template>
  <div class="markdown-body" v-html="html"></div>
</template>

50ms debounce 折中了"实时感"和"性能"——20fps 刷新率,肉眼感知流畅。


8. SSE vs WebSocket 决策

需求 选 SSE 选 WebSocket
只要服务器推 过度设计
需要客户端主动发 ❌(要另外走 HTTP)
AI 流式输出 能做但没必要
聊天 / 协同
通知流
走标准 HTTP 基建 需配置 WS 升级
Nginx 通透 需特殊配置
自动重连 ✅ 原生 自己写

in4vue 明确选 SSE:AI 问答完全符合"服务器推,客户端不需要实时反向说话"。


9. Nginx 配置:别让代理把流缓存了

Nginx 默认缓冲响应体,对 SSE 是灾难——所有事件要等流结束才一起推给客户端。

location /ai/ {
    proxy_pass http://backend:8080/;

    # 关键:关闭缓冲
    proxy_buffering off;
    proxy_cache off;

    # HTTP 1.1 保持流
    proxy_http_version 1.1;
    proxy_set_header Connection '';

    # SSE 长连接不超时
    proxy_read_timeout 86400s;
    proxy_send_timeout 86400s;

    # 禁用 chunked encoding 重写
    chunked_transfer_encoding off;
}

Cloudflare 默认就支持流式传输,不用额外配。


10. 心跳 / 长连接保活

长时间没消息可能被中间设备断开。服务器定期发注释保活:

: keepalive

: keepalive

注释行客户端不会触发 onmessage,但保持连接活跃。

OpenAI 的 API 已经这么做:没 token 产出时会周期发空事件。


11. 重连与幂等

EventSource 自动重连,但重连后数据可能重复(服务器不知道你断开时收到了哪条)。

id: 字段 + Last-Event-ID

// 服务器发
id: 42
data: 新消息

// 断开重连时浏览器自动带上
Last-Event-ID: 42

服务器从 42 之后继续发。AI 对话场景通常不管——断了就重新问,浪费一点 token 也比复杂的续传实现简单。


12. 错误处理的特殊点

SSE 的错误很微妙:

12.1 服务器返 4xx / 5xx

EventSource 会调 onerror 然后继续重连。如果你的接口确实挂了,会无限重连刷日志。

解法:用 fetch-based 版本,能精确判状态码:

if (response.status === 401) {
  // 登出处理,别重连
  return
}
if (response.status >= 500) {
  // 临时错误,延迟重连
}

12.2 用户关闭页面

EventSource 在页面卸载时自动关闭。但后端不一定立刻感知——让后端在 WriteTimeout 短点,不然挂太久浪费资源。


13. 浏览器并发连接限制

浏览器对同域的 HTTP 并发连接数有限(HTTP/1.1 通常 6 个)。多个 SSE 连接会吃掉这个配额,影响页面其他请求。

解决

in4vue 的 Edge Function 是 HTTP/2,不用操心。


14. 完整对照:三种方案的"实时通知未读数"

方案 A:轮询

// 每 30 秒查一次
setInterval(async () => {
  unreadCount.value = (await request.get('/notification/unreadCount')).count
}, 30000)

优点:零基建 缺点:延迟 0-30 秒不等,请求浪费

方案 B:SSE

const source = new EventSource('/notification/stream')
source.addEventListener('unread', (e) => {
  unreadCount.value = parseInt(e.data)
})

优点:实时,基建简单 缺点:客户端推不了东西给服务器(但这场景也不需要)

方案 C:WebSocket

const { on } = useRealtimeChannel('wss://api/ws')
on('unread', (count) => { unreadCount.value = count })

优点:如果本来就有 WS 连接,顺手用 缺点:单纯为"未读数"拉一个 WS 太重

对未读通知这种场景,SSE 最合适


15. 常见坑点

现象 原因 解法
前端收不到流式数据 Nginx 缓冲 proxy_buffering off
AI 回答全部等完才出现 后端没开 stream 或 Content-Type 错 后端 stream: true + Content-Type: text/event-stream
EventSource 不能带 Authorization 浏览器 API 限制 用 fetch-based 或 URL 带 token
断开后疯狂重连 服务器一直 4xx fetch-based 判状态码决定是否重连
消息拼一半就显示 没按完整事件解析 buffer 里留住末尾不完整行
中文乱码 没用 TextDecoderStream pipeThrough(new TextDecoderStream())
组件销毁后连接还在 没 abort onUnmounted(abort)

16. 心智模型

问: 服务器要给客户端持续推东西?
  ├─ 不,只是偶尔查状态 → 轮询
  ├─ 是,只单向推       → SSE
  └─ 是,还需要客户端也发 → WebSocket

SSE 的三个关键词:
  "单向"    "HTTP"   "原生重连"

AI 流式输出的节奏:
  提问  →  AbortController 准备
       →  fetch /ai/chat (stream)
       →  逐 token append 到 answer
       →  Markdown 渲染(debounced)
       →  [DONE] 完成 / 用户点停止 abort

17. in4vue 的 MVP 任务

SSE 是 in4vue 核心功能:

  1. ✅ Edge Function 代理 OpenAI 兼容接口(藏 key)
  2. ✅ 前端 askAI composable 封装
  3. ✅ 打字机效果 + Markdown 流式渲染
  4. 🟡 AbortController 支持用户中断
  5. 🟡 对话历史存 localStorage
  6. 🟡 token 用量展示
  7. 🔵 限流(IP 维度,Edge Function 的 KV)

前三个做好,MVP 就上线了。


小练习

  1. 实现本篇第 5 节的 streamSSE 函数
  2. 接 in4vue 的 askAI,实现一个简单问答框
  3. 加"停止生成"按钮(AbortController)
  4. 测 Markdown 流式渲染(代码块、列表、表格都要顺滑)
  5. 看 Chrome DevTools → Network → 某个 SSE 请求 → EventStream 标签

延伸阅读