1. 场景与目标
在基于Taro 4.19的多端应用(微信小程序 + H5 + 浏览器)中,后端通过HTTP Event Stream(Server‑Sent Events, SSE)返回流式结果,用于长耗时任务的过程反馈或实时推送。
受限于微信小程序不支持原生 SSE,而 H5/浏览器端可以直接使用fetch或EventSource,本方案通过「环境检测 + 请求封装」实现:
在 H5/浏览器端使用基于
fetch的标准SSE解析。在小程序端使用
Taro.request的分块传输能力,模拟SSE行协议并增量解析。通过统一的封装函数向上层暴露一致的调用方式(
onMessage / onError / onComplete回调),避免业务层感知底层差异。
2. 环境检测与统一入口
2.1 环境检测
通过Taro提供的运行时环境 API 判断当前运行平台:
const getEnv = () => Taro.getEnv()
核心判断逻辑:
当
getEnv() === Taro.ENV_TYPE.WEB时视为 H5/浏览器环境。其他值(如
WEAPP等)视为小程序环境。
2.2 统一的流式请求入口
业务只依赖一个入口函数,例如:
export const sendEventStreamRequest = async (url, payload, {
onMessage,
onError,
onComplete
} = {}) => {
const env = getEnv()
if (env === Taro.ENV_TYPE.WEB) {
// H5 / 浏览器:使用 Fetch + SSE
return fetchSSE(url, payload, onMessage, onError, onComplete)
}
// 小程序:使用 Taro.request 分块模拟 SSE
return miniProgramSSE(url, payload, onMessage, onError, onComplete)
}
上层页面只需关心:
传入请求 URL 与业务参数。
监听
onMessage获取增量结果。在
onError、onComplete中做 UI 状态收尾,无需感知平台差异。
3. H5/浏览器端实现:Fetch + 原生SSE流
3.1 请求与超时控制
在 H5/浏览器侧使用fetch发送带有Accept: text/event-stream的POST请求,并通过AbortController控制超时:
可以使用@microsoft/fetch-event-source等工具库实现SSE数据流解析,但是出于不信赖Microsoft的开源库,并未采纳。
const fetchSSE = async (url, data, onMessage, onError, onComplete) => {
const controller = new AbortController()
const timeoutId = setTimeout(() => controller.abort(), SSE_TIMEOUT)
try {
const token = TokenManager.getToken()
const headers = {
'Content-Type': 'application/json',
Accept: 'text/event-stream'
}
if (token) headers.Authorization = `Bearer ${token}`
const response = await fetch(url, {
method: 'POST',
headers,
body: JSON.stringify(data),
signal: controller.signal
})
clearTimeout(timeoutId)
if (!response.ok) {
// 统一转为业务可感知的错误
throw new Error(`HTTP error: ${response.status}`)
}
await handleSSEStream(response, onMessage, onError, onComplete)
} catch (error) {
if (error.name === 'AbortError') {
onError?.(new Error('请求超时'))
} else {
onError?.(error)
}
}
}
关键点:
使用
Accept: text/event-stream明确告知后端这是SSE请求。通过
Authorization头透传认证信息(如 Bearer Token)。统一用
SSE_TIMEOUT控制整次流请求超时,防止服务端异常导致长期占用连接。
需要注意,当前较新版本的fetch接口支持通过signal设置超时时间,具体如下所示(当前项目出于兼容性考虑未使用此方法):
const url = "https://path_to_large_file.mp4";
try {
const res = await fetch(url, { signal: AbortSignal.timeout(5000) });
const result = await res.blob();
// …
} catch (err) {
if (err.name === "TimeoutError") {
console.error("Timeout: It took more than 5 seconds to get the result!");
} else if (err.name === "AbortError") {
console.error(
"Fetch aborted by user action (browser stop button, closing tab, etc.",
);
} else if (err.name === "TypeError") {
console.error("AbortSignal.timeout() method is not supported");
} else {
// A network error, or some other problem.
console.error(`Error: type: ${err.name}, message: ${err.message}`);
}
}
3.2 SSE消息流解析
SSE的响应格式本质是「按行分隔的文本流」,典型形式:
event: message
data: {"chunk":"..."}
event: message
data: {"chunk":"..."}
event: done
data: [DONE]
一个通用的解析流程如下:
获取
response.body.getReader(),按块读取二进制数据。使用
TextDecoder('utf-8')解码为字符串,并用缓冲区累积不完整行。按
\n拆分为多行,分别处理event:与data:。对于普通消息:
将多次
data:行拼接成完整的 JSON 字符串。尝试
JSON.parse并通过onMessage透出。
对于结束标记:
约定
data: [DONE]为流结束信号。解析完最后一段 JSON 后调用
onComplete,关闭循环。
对于
event: error:解析data为错误对象或文本,调用onError并终止流。
示意性的解析代码:
const handleSSEStream = async (response, onMessage, onError, onComplete) => {
const reader = response.body.getReader()
const decoder = new TextDecoder('utf-8')
let buffer = ''
let jsonBuffer = ''
let currentEvent = ''
while (true) {
const { done, value } = await reader.read()
if (done) break
buffer += decoder.decode(value, { stream: true })
const lines = buffer.split('\n')
buffer = lines.pop() || ''
for (const rawLine of lines) {
const line = rawLine.trim()
if (!line || line.startsWith(':')) continue
if (line.startsWith('event:')) {
currentEvent = line.slice(6).trim()
continue
}
if (line.startsWith('data:')) {
const data = line.slice(5)
if (data.trim() === '[DONE]') {
if (jsonBuffer.trim()) {
try {
onMessage?.(JSON.parse(jsonBuffer))
} catch (e) {
onError?.(new Error('响应数据格式错误'))
}
}
onComplete?.()
return
}
if (!currentEvent || currentEvent === 'message') {
jsonBuffer += data
} else if (currentEvent === 'error') {
onError?.(new Error(data.trim()))
return
}
currentEvent = ''
}
}
}
}
3.3 与小程序方案的区别
H5端可以直接依赖浏览器提供的
ReadableStream与UTF‑8解码能力,解析逻辑更直接。SSE在H5端可以依赖长连接与内置重连机制(若使用
EventSource),而在小程序端需要自行约定超时与重试策略。浏览器端对大文本/多 chunk 的处理更稳定,小程序端需要特别注意分块的边界和字符集问题。
4. 微信小程序端实现:Taro.request分块模拟SSE
4.1 请求配置:启用分块 + 二进制响应
微信小程序不支持原生 SSE,但支持 HTTP 分块传输。在Taro中可以通过如下方式配置:
const miniProgramSSE = async (url, data, onMessage, onError, onComplete) => {
try {
const requestTask = Taro.request({
url,
method: 'POST',
data,
header: getHeaders({
Accept: 'text/event-stream'
}),
timeout: SSE_TIMEOUT,
enableChunked: true, // 1) 告知底层允许分块传输
responseType: 'arraybuffer' // 2) 以二进制形式接收数据
// success / fail 在这里主要负责整体结束态处理
})
// 持续监听流式分块(如果平台支持)
if (requestTask.onChunkReceived) {
attachChunkHandler(requestTask, onMessage)
}
} catch (error) {
onError?.(error)
}
}
要点:
小程序端同样通过
Accept: text/event-stream与服务端约定「按SSE行协议输出」。通过
enableChunked允许在单个请求中多次下发数据块。使用
responseType: 'arraybuffer',后续自行完成 UTF‑8 解码与行解析。
4.2 ArrayBuffer → 文本:simpleTextDecoder
小程序端返回的是ArrayBuffer,需要转换为正确的 UTF‑8 文本:
function simpleTextDecoder(arrayBuffer) {
const uint8Array = new Uint8Array(arrayBuffer)
const str = String.fromCharCode.apply(null, uint8Array)
// 通过 escape + decodeURIComponent 粗略还原 UTF‑8(适用于一般中文场景)
return decodeURIComponent(escape(str))
}
说明:
这里给出的是一种常见的兼容性实现,适用于大多数包含中文的SSE文本。
如果后续环境统一支持
TextDecoder,可以直接替换为标准 API。
4.3 分块数据解析流程
在onChunkReceived中,将每个分块转换为文本再按行解析:
const attachChunkHandler = (requestTask, onMessage) => {
let currentEvent = ''
requestTask.onChunkReceived((res) => {
const chunkText = simpleTextDecoder(res.data)
const lines = chunkText.split('\n')
const eventLine = lines.find((line) => line.startsWith('event:'))
if (eventLine) {
currentEvent = eventLine.split('event:')[1].trim()
}
const dataLine = lines
.find((line) => line.startsWith('data:'))
?.split('data:')[1]
if (!dataLine) return
const rawData = dataLine.trim()
// 上层可根据约定的 [DONE] / error 等事件类型做收尾
if (rawData === '[DONE]') {
onMessage?.({ type: 'done' })
return
}
let parsed
try {
parsed = JSON.parse(rawData)
} catch {
parsed = { type: currentEvent || 'message', data: rawData }
}
onMessage?.(parsed)
})
}
该实现体现的小程序端「分块解析」特点:
每个
chunk都是一次「增量文本」,可能只包含部分行或一个完整事件块。利用
event:行维护当前事件类型,data:行承载业务负载。解码失败时仍然向上抛出原始文本,便于业务侧记录日志和回溯问题。
4.4 与H5方案的关键差异
协议同源,能力不同步:两端都使用SSE行协议(
event:+data:),但 H5 依赖浏览器原生流 API,小程序则通过onChunkReceived模拟。字符解码责任不同:H5 端可以直接使用
TextDecoder,小程序端需要自行实现或引入 polyfill。结束与重试策略需自行约定:通过
data: [DONE]等约定结束,若需要重试需在业务侧封装轮询或重连逻辑。
5. 统一封装的使用方式示例
上层业务可以通过统一入口发起流式请求,例如:
sendEventStreamRequest('/your/api/path', { /* 业务参数 */ }, {
onMessage: (chunk) => {
// 根据 chunk 内容增量更新页面,如:
// - 拼接生成中的文案
// - 追加日志 / 步骤提示
},
onError: (err) => {
// 显示错误提示,记录日志
},
onComplete: () => {
// 将状态切换为「完成」,可允许用户再次发起请求
}
})
注意:示例中不依赖任何具体业务命名(如作文、登录等),可直接迁移到任意需要「长耗时任务 + 流式反馈」的场景,例如:
文本生成 / 内容审核过程展示。
批量导入 / 导出任务的进度反馈。
长连接任务的中间状态推送等。
6. 错误处理与边界情况
为保证多端一致的开发体验,推荐在封装层统一处理以下类别问题:
认证失效(401):
在任一端检测到401时,清理本地Token,并通过统一的跳转逻辑回到登录页或授权页。
尽量不要在业务页面中分散处理401,避免逻辑重复。
网络或超时错误:
对
AbortError或小程序端超时错误统一转换为「请求超时」提示。建议在UI上提供「重新尝试」按钮,而不是静默失败。
数据格式错误:
对 JSON 解析失败的场景,封装层应给出清晰的错误描述(如「响应数据格式错误」),并输出原始内容用于排查。
在小程序端,考虑将无法解析的原始
data文本一并上传到日志系统。
服务端异常:
对于非 2xx 状态码,尽量读取服务端返回的错误体(如果是 JSON)并转换为友好的错误消息。
若无法解析,则退化为
HTTP error: <status>之类的通用提示。