feat(announcements,messaging,notifications): 实现所有长期问题 — SSE 实时推送 + 通知日志持久化 + 优先级/归档 + 消息星标/草稿 + 公告已读回执/置顶 + 分类筛选/桌面推送 + 测试覆盖
P1-8 通知实时推送(SSE): - 新增 /api/notifications/stream SSE 端点(15 秒推送,5 分钟超时) - 新增 useNotificationStream Hook(SSE + 轮询降级) - NotificationDropdown 改用 SSE 实时推送 P2-12 测试覆盖: - notifications/dispatcher.test.ts(6 个测试,渠道选择逻辑) - notifications/channels/in-app-channel.test.ts(9 个测试,类型映射) - messaging/schema.test.ts(34 个测试,Zod 校验) - tests/e2e/messages.spec.ts(消息模块 E2E 测试) - vitest.unit.config.ts 添加 server-only stub P2-13a 通知发送日志持久化: - 新增 notification_logs 表(userId/title/channel/status/messageId/error/sentAt) - logNotificationSend 改为 async 写入 DB(失败降级 console) - dispatcher 传递 payload 用于持久化 P2-13b 通知优先级和归档: - messageNotifications 表新增 priority(low/normal/high/urgent)和 isArchived 字段 - getNotifications 支持归档和优先级筛选 - 新增 archiveNotificationAction - NotificationList 显示优先级 Badge 和归档按钮 P2-13c 消息星标和草稿: - messages 表新增 isStarred 字段 - 新增 message_drafts 表 - 新增 toggleMessageStar + 草稿 CRUD Server Actions - 新增 5 个草稿 data-access 函数 P2-13d 公告已读回执和置顶: - announcements 表新增 isPinned 字段 - 新增 announcement_reads 表(唯一索引保证幂等) - 新增 toggleAnnouncementPinAction + markAnnouncementAsReadAction - getAnnouncements 排序置顶优先 P2-13e 通知分类筛选和桌面推送: - NotificationList 添加按类型筛选按钮组 - 新增 useDesktopNotifications Hook(浏览器 Notification API) - NotificationDropdown 集成桌面推送(新通知触发) 架构图同步: - 004 和 005 均已更新(新增表、Action、Hook、组件描述)
This commit is contained in:
118
src/app/api/notifications/stream/route.ts
Normal file
118
src/app/api/notifications/stream/route.ts
Normal file
@@ -0,0 +1,118 @@
|
||||
import { NextRequest } from "next/server"
|
||||
|
||||
import { PermissionDeniedError, requirePermission } from "@/shared/lib/auth-guard"
|
||||
import { Permissions } from "@/shared/types/permissions"
|
||||
import { getUnreadNotificationCount, getNotifications } from "@/modules/notifications/data-access"
|
||||
|
||||
/**
|
||||
* 通知实时推送 SSE 端点
|
||||
*
|
||||
* 使用 Server-Sent Events 向客户端推送通知更新,
|
||||
* 替代原有的 30/60 秒轮询模式,降低延迟和服务器压力。
|
||||
*
|
||||
* 推送策略:
|
||||
* - 连接建立时立即推送一次当前状态(未读数 + 最新通知列表)
|
||||
* - 之后每 15 秒推送一次更新(作为 SSE 心跳 + 数据刷新)
|
||||
* - 客户端断开连接时清理定时器
|
||||
*
|
||||
* 安全:
|
||||
* - requirePermission(MESSAGE_READ) 权限校验
|
||||
* - 连接超时 5 分钟自动关闭(防止僵尸连接)
|
||||
*/
|
||||
|
||||
const FORMAT_EVENT = (data: unknown): string => `data: ${JSON.stringify(data)}\n\n`
|
||||
const FORMAT_DONE = "data: [DONE]\n\n"
|
||||
|
||||
/** SSE 推送间隔(毫秒) */
|
||||
const SSE_PUSH_INTERVAL_MS = 15_000
|
||||
|
||||
/** SSE 连接最大存活时间(毫秒) */
|
||||
const SSE_MAX_LIFETIME_MS = 5 * 60_000
|
||||
|
||||
export async function GET(_request: NextRequest): Promise<Response> {
|
||||
const encoder = new TextEncoder()
|
||||
|
||||
try {
|
||||
const ctx = await requirePermission(Permissions.MESSAGE_READ)
|
||||
|
||||
let intervalTimer: ReturnType<typeof setInterval> | null = null
|
||||
let lifetimeTimer: ReturnType<typeof setTimeout> | null = null
|
||||
let closed = false
|
||||
|
||||
const stream = new ReadableStream<Uint8Array>({
|
||||
async start(controller) {
|
||||
const safeEnqueue = (data: string): void => {
|
||||
if (closed) return
|
||||
try {
|
||||
controller.enqueue(encoder.encode(data))
|
||||
} catch {
|
||||
closed = true
|
||||
}
|
||||
}
|
||||
|
||||
const pushUpdate = async (): Promise<void> => {
|
||||
if (closed) return
|
||||
try {
|
||||
const [unreadCount, notificationsResult] = await Promise.all([
|
||||
getUnreadNotificationCount(ctx.userId),
|
||||
getNotifications(ctx.userId, { page: 1, pageSize: 10 }),
|
||||
])
|
||||
safeEnqueue(FORMAT_EVENT({
|
||||
type: "update",
|
||||
unreadCount,
|
||||
notifications: notificationsResult.items,
|
||||
}))
|
||||
} catch {
|
||||
// 查询失败不关闭连接,等待下次重试
|
||||
}
|
||||
}
|
||||
|
||||
// 连接建立时立即推送一次
|
||||
await pushUpdate()
|
||||
|
||||
// 定时推送
|
||||
intervalTimer = setInterval(() => {
|
||||
void pushUpdate()
|
||||
}, SSE_PUSH_INTERVAL_MS)
|
||||
|
||||
// 连接超时清理(5 分钟后自动关闭)
|
||||
lifetimeTimer = setTimeout(() => {
|
||||
safeEnqueue(FORMAT_DONE)
|
||||
if (intervalTimer) clearInterval(intervalTimer)
|
||||
if (!closed) {
|
||||
closed = true
|
||||
try {
|
||||
controller.close()
|
||||
} catch {
|
||||
// 已关闭
|
||||
}
|
||||
}
|
||||
}, SSE_MAX_LIFETIME_MS)
|
||||
},
|
||||
cancel() {
|
||||
if (intervalTimer) clearInterval(intervalTimer)
|
||||
if (lifetimeTimer) clearTimeout(lifetimeTimer)
|
||||
closed = true
|
||||
},
|
||||
})
|
||||
|
||||
return new Response(stream, {
|
||||
headers: {
|
||||
"Content-Type": "text/event-stream",
|
||||
"Cache-Control": "no-cache, no-transform",
|
||||
Connection: "keep-alive",
|
||||
},
|
||||
})
|
||||
} catch (error) {
|
||||
if (error instanceof PermissionDeniedError) {
|
||||
return new Response(FORMAT_EVENT({ type: "error", message: "Permission denied" }), {
|
||||
status: 403,
|
||||
headers: { "Content-Type": "text/event-stream" },
|
||||
})
|
||||
}
|
||||
return new Response(FORMAT_EVENT({ type: "error", message: "Internal error" }), {
|
||||
status: 500,
|
||||
headers: { "Content-Type": "text/event-stream" },
|
||||
})
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user