2团
Published on 2026-01-08 / 2 Visits
0
0

Taro跨端应用中的SSE流式请求实现:兼容微信小程序与H5的统一方案

1. 场景与目标

在基于Taro 4.19的多端应用(微信小程序 + H5 + 浏览器)中,后端通过HTTP Event Stream(Server‑Sent Events, SSE)返回流式结果,用于长耗时任务的过程反馈或实时推送。
受限于微信小程序不支持原生 SSE,而 H5/浏览器端可以直接使用fetchEventSource,本方案通过「环境检测 + 请求封装」实现:

  • 在 H5/浏览器端使用基于fetch的标准SSE解析。

  • 在小程序端使用Taro.request的分块传输能力,模拟SSE行协议并增量解析。

  • 通过统一的封装函数向上层暴露一致的调用方式(onMessage / onError / onComplete回调),避免业务层感知底层差异。

2. 环境检测与统一入口

2.1 环境检测

通过Taro提供的运行时环境 API 判断当前运行平台:

const getEnv = () => Taro.getEnv()

核心判断逻辑:

  1. getEnv() === Taro.ENV_TYPE.WEB时视为 H5/浏览器环境。

  2. 其他值(如WEAPP等)视为小程序环境。

2.2 统一的流式请求入口

业务只依赖一个入口函数,例如:

export const sendEventStreamRequest = async (url, payload, {
  onMessage,
  onError,
  onComplete
} = {}) => {
  const env = getEnv()

  if (env === Taro.ENV_TYPE.WEB) {
    // H5 / 浏览器:使用 Fetch + SSE
    return fetchSSE(url, payload, onMessage, onError, onComplete)
  }

  // 小程序:使用 Taro.request 分块模拟 SSE
  return miniProgramSSE(url, payload, onMessage, onError, onComplete)
}

上层页面只需关心:

  1. 传入请求 URL 与业务参数。

  2. 监听onMessage获取增量结果。

  3. onErroronComplete中做 UI 状态收尾,无需感知平台差异。

3. H5/浏览器端实现:Fetch + 原生SSE流

3.1 请求与超时控制

在 H5/浏览器侧使用fetch发送带有Accept: text/event-stream的POST请求,并通过AbortController控制超时:

可以使用@microsoft/fetch-event-source等工具库实现SSE数据流解析,但是出于不信赖Microsoft的开源库,并未采纳。

const fetchSSE = async (url, data, onMessage, onError, onComplete) => {
  const controller = new AbortController()
  const timeoutId = setTimeout(() => controller.abort(), SSE_TIMEOUT)

  try {
    const token = TokenManager.getToken()
    const headers = {
      'Content-Type': 'application/json',
      Accept: 'text/event-stream'
    }
    if (token) headers.Authorization = `Bearer ${token}`

    const response = await fetch(url, {
      method: 'POST',
      headers,
      body: JSON.stringify(data),
      signal: controller.signal
    })

    clearTimeout(timeoutId)

    if (!response.ok) {
      // 统一转为业务可感知的错误
      throw new Error(`HTTP error: ${response.status}`)
    }

    await handleSSEStream(response, onMessage, onError, onComplete)
  } catch (error) {
    if (error.name === 'AbortError') {
      onError?.(new Error('请求超时'))
    } else {
      onError?.(error)
    }
  }
}

关键点:

  1. 使用Accept: text/event-stream明确告知后端这是SSE请求。

  2. 通过Authorization头透传认证信息(如 Bearer Token)。

  3. 统一用SSE_TIMEOUT控制整次流请求超时,防止服务端异常导致长期占用连接。

需要注意,当前较新版本的fetch接口支持通过signal设置超时时间,具体如下所示(当前项目出于兼容性考虑未使用此方法):

const url = "https://path_to_large_file.mp4";

try {
  const res = await fetch(url, { signal: AbortSignal.timeout(5000) });
  const result = await res.blob();
  // …
} catch (err) {
  if (err.name === "TimeoutError") {
    console.error("Timeout: It took more than 5 seconds to get the result!");
  } else if (err.name === "AbortError") {
    console.error(
      "Fetch aborted by user action (browser stop button, closing tab, etc.",
    );
  } else if (err.name === "TypeError") {
    console.error("AbortSignal.timeout() method is not supported");
  } else {
    // A network error, or some other problem.
    console.error(`Error: type: ${err.name}, message: ${err.message}`);
  }
}

3.2 SSE消息流解析

SSE的响应格式本质是「按行分隔的文本流」,典型形式:

event: message
data: {"chunk":"..."}

event: message
data: {"chunk":"..."}

event: done
data: [DONE]

一个通用的解析流程如下:

  1. 获取response.body.getReader(),按块读取二进制数据。

  2. 使用TextDecoder('utf-8')解码为字符串,并用缓冲区累积不完整行。

  3. \n拆分为多行,分别处理event:data:

  4. 对于普通消息:

    • 将多次data:行拼接成完整的 JSON 字符串。

    • 尝试JSON.parse并通过onMessage透出。

  5. 对于结束标记:

    • 约定data: [DONE]为流结束信号。

    • 解析完最后一段 JSON 后调用onComplete,关闭循环。

  6. 对于event: error:解析data为错误对象或文本,调用onError并终止流。

示意性的解析代码:

const handleSSEStream = async (response, onMessage, onError, onComplete) => {
  const reader = response.body.getReader()
  const decoder = new TextDecoder('utf-8')
  let buffer = ''
  let jsonBuffer = ''
  let currentEvent = ''

  while (true) {
    const { done, value } = await reader.read()
    if (done) break

    buffer += decoder.decode(value, { stream: true })
    const lines = buffer.split('\n')
    buffer = lines.pop() || ''

    for (const rawLine of lines) {
      const line = rawLine.trim()
      if (!line || line.startsWith(':')) continue

      if (line.startsWith('event:')) {
        currentEvent = line.slice(6).trim()
        continue
      }

      if (line.startsWith('data:')) {
        const data = line.slice(5)

        if (data.trim() === '[DONE]') {
          if (jsonBuffer.trim()) {
            try {
              onMessage?.(JSON.parse(jsonBuffer))
            } catch (e) {
              onError?.(new Error('响应数据格式错误'))
            }
          }
          onComplete?.()
          return
        }

        if (!currentEvent || currentEvent === 'message') {
          jsonBuffer += data
        } else if (currentEvent === 'error') {
          onError?.(new Error(data.trim()))
          return
        }

        currentEvent = ''
      }
    }
  }
}

3.3 与小程序方案的区别

  1. H5端可以直接依赖浏览器提供的ReadableStream与UTF‑8解码能力,解析逻辑更直接。

  2. SSE在H5端可以依赖长连接与内置重连机制(若使用EventSource),而在小程序端需要自行约定超时与重试策略。

  3. 浏览器端对大文本/多 chunk 的处理更稳定,小程序端需要特别注意分块的边界和字符集问题。

4. 微信小程序端实现:Taro.request分块模拟SSE

4.1 请求配置:启用分块 + 二进制响应

微信小程序不支持原生 SSE,但支持 HTTP 分块传输。在Taro中可以通过如下方式配置:

const miniProgramSSE = async (url, data, onMessage, onError, onComplete) => {
  try {
    const requestTask = Taro.request({
      url,
      method: 'POST',
      data,
      header: getHeaders({
        Accept: 'text/event-stream'
      }),
      timeout: SSE_TIMEOUT,
      enableChunked: true,      // 1) 告知底层允许分块传输
      responseType: 'arraybuffer' // 2) 以二进制形式接收数据
      // success / fail 在这里主要负责整体结束态处理
    })

    // 持续监听流式分块(如果平台支持)
    if (requestTask.onChunkReceived) {
      attachChunkHandler(requestTask, onMessage)
    }
  } catch (error) {
    onError?.(error)
  }
}

要点:

  1. 小程序端同样通过Accept: text/event-stream与服务端约定「按SSE行协议输出」。

  2. 通过enableChunked允许在单个请求中多次下发数据块。

  3. 使用responseType: 'arraybuffer',后续自行完成 UTF‑8 解码与行解析。

4.2 ArrayBuffer → 文本:simpleTextDecoder

小程序端返回的是ArrayBuffer,需要转换为正确的 UTF‑8 文本:

function simpleTextDecoder(arrayBuffer) {
  const uint8Array = new Uint8Array(arrayBuffer)
  const str = String.fromCharCode.apply(null, uint8Array)
  // 通过 escape + decodeURIComponent 粗略还原 UTF‑8(适用于一般中文场景)
  return decodeURIComponent(escape(str))
}

说明:

  1. 这里给出的是一种常见的兼容性实现,适用于大多数包含中文的SSE文本。

  2. 如果后续环境统一支持TextDecoder,可以直接替换为标准 API。

4.3 分块数据解析流程

onChunkReceived中,将每个分块转换为文本再按行解析:

const attachChunkHandler = (requestTask, onMessage) => {
  let currentEvent = ''

  requestTask.onChunkReceived((res) => {
    const chunkText = simpleTextDecoder(res.data)
    const lines = chunkText.split('\n')

    const eventLine = lines.find((line) => line.startsWith('event:'))
    if (eventLine) {
      currentEvent = eventLine.split('event:')[1].trim()
    }

    const dataLine = lines
      .find((line) => line.startsWith('data:'))
      ?.split('data:')[1]

    if (!dataLine) return

    const rawData = dataLine.trim()

    // 上层可根据约定的 [DONE] / error 等事件类型做收尾
    if (rawData === '[DONE]') {
      onMessage?.({ type: 'done' })
      return
    }

    let parsed
    try {
      parsed = JSON.parse(rawData)
    } catch {
      parsed = { type: currentEvent || 'message', data: rawData }
    }

    onMessage?.(parsed)
  })
}

该实现体现的小程序端「分块解析」特点:

  1. 每个chunk都是一次「增量文本」,可能只包含部分行或一个完整事件块。

  2. 利用event:行维护当前事件类型,data:行承载业务负载。

  3. 解码失败时仍然向上抛出原始文本,便于业务侧记录日志和回溯问题。

4.4 与H5方案的关键差异

  1. 协议同源,能力不同步:两端都使用SSE行协议(event:+data:),但 H5 依赖浏览器原生流 API,小程序则通过onChunkReceived模拟。

  2. 字符解码责任不同:H5 端可以直接使用TextDecoder,小程序端需要自行实现或引入 polyfill。

  3. 结束与重试策略需自行约定:通过data: [DONE]等约定结束,若需要重试需在业务侧封装轮询或重连逻辑。

5. 统一封装的使用方式示例

上层业务可以通过统一入口发起流式请求,例如:

sendEventStreamRequest('/your/api/path', { /* 业务参数 */ }, {
  onMessage: (chunk) => {
    // 根据 chunk 内容增量更新页面,如:
    // - 拼接生成中的文案
    // - 追加日志 / 步骤提示
  },
  onError: (err) => {
    // 显示错误提示,记录日志
  },
  onComplete: () => {
    // 将状态切换为「完成」,可允许用户再次发起请求
  }
})

注意:示例中不依赖任何具体业务命名(如作文、登录等),可直接迁移到任意需要「长耗时任务 + 流式反馈」的场景,例如:

  1. 文本生成 / 内容审核过程展示。

  2. 批量导入 / 导出任务的进度反馈。

  3. 长连接任务的中间状态推送等。

6. 错误处理与边界情况

为保证多端一致的开发体验,推荐在封装层统一处理以下类别问题:

  1. 认证失效(401)

    • 在任一端检测到401时,清理本地Token,并通过统一的跳转逻辑回到登录页或授权页。

    • 尽量不要在业务页面中分散处理401,避免逻辑重复。

  2. 网络或超时错误

    • AbortError或小程序端超时错误统一转换为「请求超时」提示。

    • 建议在UI上提供「重新尝试」按钮,而不是静默失败。

  3. 数据格式错误

    • 对 JSON 解析失败的场景,封装层应给出清晰的错误描述(如「响应数据格式错误」),并输出原始内容用于排查。

    • 在小程序端,考虑将无法解析的原始data文本一并上传到日志系统。

  4. 服务端异常

    • 对于非 2xx 状态码,尽量读取服务端返回的错误体(如果是 JSON)并转换为友好的错误消息。

    • 若无法解析,则退化为HTTP error: <status>之类的通用提示。


Comment