Files
NextEdu/src/modules/messaging/data-access.ts
SpecialX f75602d14e feat(announcements,messaging,notifications): 实现所有长期问题 — SSE 实时推送 + 通知日志持久化 + 优先级/归档 + 消息星标/草稿 + 公告已读回执/置顶 + 分类筛选/桌面推送 + 测试覆盖
P1-8 通知实时推送(SSE):
- 新增 /api/notifications/stream SSE 端点(15 秒推送,5 分钟超时)
- 新增 useNotificationStream Hook(SSE + 轮询降级)
- NotificationDropdown 改用 SSE 实时推送

P2-12 测试覆盖:
- notifications/dispatcher.test.ts(6 个测试,渠道选择逻辑)
- notifications/channels/in-app-channel.test.ts(9 个测试,类型映射)
- messaging/schema.test.ts(34 个测试,Zod 校验)
- tests/e2e/messages.spec.ts(消息模块 E2E 测试)
- vitest.unit.config.ts 添加 server-only stub

P2-13a 通知发送日志持久化:
- 新增 notification_logs 表(userId/title/channel/status/messageId/error/sentAt)
- logNotificationSend 改为 async 写入 DB(失败降级 console)
- dispatcher 传递 payload 用于持久化

P2-13b 通知优先级和归档:
- messageNotifications 表新增 priority(low/normal/high/urgent)和 isArchived 字段
- getNotifications 支持归档和优先级筛选
- 新增 archiveNotificationAction
- NotificationList 显示优先级 Badge 和归档按钮

P2-13c 消息星标和草稿:
- messages 表新增 isStarred 字段
- 新增 message_drafts 表
- 新增 toggleMessageStar + 草稿 CRUD Server Actions
- 新增 5 个草稿 data-access 函数

P2-13d 公告已读回执和置顶:
- announcements 表新增 isPinned 字段
- 新增 announcement_reads 表(唯一索引保证幂等)
- 新增 toggleAnnouncementPinAction + markAnnouncementAsReadAction
- getAnnouncements 排序置顶优先

P2-13e 通知分类筛选和桌面推送:
- NotificationList 添加按类型筛选按钮组
- 新增 useDesktopNotifications Hook(浏览器 Notification API)
- NotificationDropdown 集成桌面推送(新通知触发)

架构图同步:
- 004 和 005 均已更新(新增表、Action、Hook、组件描述)
2026-06-23 10:13:57 +08:00

405 lines
14 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import "server-only"
/**
* 私信数据访问层
*
* 职责:
* - getMessages / getMessageById / getMessageThread: 私信查询
* - createMessage / markMessageAsRead / deleteMessage: 私信 CRUD
* - getUnreadMessageCount: 未读私信计数
* - getRecipients: 获取收件人列表(按 DataScope 过滤)
*
* 通知相关函数createNotification / getNotifications /
* markNotificationAsRead / markAllNotificationsAsRead / getUnreadNotificationCount
* 已迁移到 notifications/data-access.ts请直接从该模块导入。
*/
import { cache } from "react"
import { createId } from "@paralleldrive/cuid2"
import { and, count, desc, eq, inArray, isNull, like, or, type SQL } from "drizzle-orm"
import { db } from "@/shared/db"
import {
messages,
messageDrafts,
users,
} from "@/shared/db/schema"
import {
getClassesByGradeId,
getStudentIdsByClassIds,
getTeacherIdsByClassIds,
getStudentActiveClassId,
} from "@/modules/classes/data-access"
import { getUserNamesByIds } from "@/modules/users/data-access"
import type { DataScope } from "@/shared/types/permissions"
import type {
Message,
GetMessagesParams,
CreateMessageInput,
RecipientOption,
MessageDraft,
CreateMessageDraftInput,
UpdateMessageDraftInput,
} from "./types"
import type { PaginatedResult } from "@/modules/notifications/types"
const toIso = (d: Date | null | undefined): string | null => (d ? d.toISOString() : null)
const toIsoRequired = (d: Date): string => d.toISOString()
interface MessageRow {
id: string
senderId: string
receiverId: string
subject: string | null
content: string
isRead: boolean
isStarred: boolean
readAt: Date | null
parentMessageId: string | null
createdAt: Date
}
async function resolveUserNames(userIds: string[]): Promise<Map<string, string>> {
const uniqueIds = [...new Set(userIds)].filter(Boolean)
if (uniqueIds.length === 0) return new Map()
const rows = await db
.select({ id: users.id, name: users.name })
.from(users)
.where(inArray(users.id, uniqueIds))
return new Map(rows.map((r) => [r.id, r.name ?? r.id]))
}
const mapMessage = (r: MessageRow, nameMap: Map<string, string>): Message => ({
id: r.id,
senderId: r.senderId,
senderName: nameMap.get(r.senderId) ?? null,
receiverId: r.receiverId,
receiverName: nameMap.get(r.receiverId) ?? null,
subject: r.subject,
content: r.content,
isRead: r.isRead,
isStarred: r.isStarred,
readAt: toIso(r.readAt),
parentMessageId: r.parentMessageId,
createdAt: toIsoRequired(r.createdAt),
})
export const getMessages = cache(
async (params: GetMessagesParams): Promise<PaginatedResult<Message>> => {
const page = Math.max(1, params.page ?? 1)
const pageSize = Math.max(1, params.pageSize ?? 20)
const offset = (page - 1) * pageSize
const conds: SQL[] = []
if (params.type === "inbox") {
conds.push(eq(messages.receiverId, params.userId))
conds.push(isNull(messages.receiverDeletedAt))
} else if (params.type === "sent") {
conds.push(eq(messages.senderId, params.userId))
conds.push(isNull(messages.senderDeletedAt))
} else {
// all: 仅返回当前用户未删除的消息(发送方未删 或 接收方未删)
const cond = or(
and(eq(messages.receiverId, params.userId), isNull(messages.receiverDeletedAt)),
and(eq(messages.senderId, params.userId), isNull(messages.senderDeletedAt))
)
if (cond) conds.push(cond)
}
// 关键词搜索(匹配 subject 或 content
if (params.keyword && params.keyword.trim().length > 0) {
const kw = `%${params.keyword.trim()}%`
const kwCond = or(like(messages.subject, kw), like(messages.content, kw))
if (kwCond) conds.push(kwCond)
}
// V2-P2-13c: 仅返回星标消息
if (params.starredOnly) {
conds.push(eq(messages.isStarred, true))
}
const where = and(...conds)
const [rows, [totalRow]] = await Promise.all([
db.select().from(messages).where(where).orderBy(desc(messages.createdAt)).limit(pageSize).offset(offset),
db.select({ value: count() }).from(messages).where(where),
])
const userIds = rows.flatMap((r) => [r.senderId, r.receiverId])
const nameMap = await resolveUserNames(userIds)
const total = Number(totalRow?.value ?? 0)
return { items: rows.map((r) => mapMessage(r, nameMap)), total, page, pageSize, totalPages: Math.ceil(total / pageSize) }
}
)
export const getMessageById = cache(
async (id: string, userId: string): Promise<Message | null> => {
const [row] = await db
.select()
.from(messages)
.where(
and(
eq(messages.id, id),
or(
and(eq(messages.senderId, userId), isNull(messages.senderDeletedAt)),
and(eq(messages.receiverId, userId), isNull(messages.receiverDeletedAt))
)
)
)
.limit(1)
if (!row) return null
const nameMap = await resolveUserNames([row.senderId, row.receiverId])
return mapMessage(row, nameMap)
}
)
export const getMessageThread = cache(async (messageId: string): Promise<Message[]> => {
const [root] = await db.select().from(messages).where(eq(messages.id, messageId)).limit(1)
if (!root) return []
const replies = await db
.select()
.from(messages)
.where(eq(messages.parentMessageId, messageId))
.orderBy(desc(messages.createdAt))
const allRows = [root, ...replies]
const nameMap = await resolveUserNames(allRows.flatMap((r) => [r.senderId, r.receiverId]))
return allRows.map((r) => mapMessage(r, nameMap))
})
export async function createMessage(data: CreateMessageInput): Promise<string> {
const id = createId()
await db.insert(messages).values({
id,
senderId: data.senderId,
receiverId: data.receiverId,
subject: data.subject ?? null,
content: data.content,
parentMessageId: data.parentMessageId ?? null,
})
return id
}
export async function markMessageAsRead(id: string, userId: string): Promise<void> {
await db
.update(messages)
.set({ isRead: true, readAt: new Date() })
.where(and(eq(messages.id, id), eq(messages.receiverId, userId), eq(messages.isRead, false)))
}
export async function deleteMessage(id: string, userId: string): Promise<void> {
const now = new Date()
// 软删除:发送方删除设置 senderDeletedAt接收方删除设置 receiverDeletedAt互不影响
// 使用事务保证两次 UPDATE 的原子性,避免部分失败导致数据不一致
await db.transaction(async (tx) => {
await tx
.update(messages)
.set({ senderDeletedAt: now })
.where(and(eq(messages.id, id), eq(messages.senderId, userId)))
await tx
.update(messages)
.set({ receiverDeletedAt: now })
.where(and(eq(messages.id, id), eq(messages.receiverId, userId)))
})
}
export async function toggleMessageStar(id: string, userId: string): Promise<void> {
// 查询当前星标状态
const [row] = await db
.select({ isStarred: messages.isStarred })
.from(messages)
.where(and(eq(messages.id, id), eq(messages.receiverId, userId)))
.limit(1)
if (!row) return
await db
.update(messages)
.set({ isStarred: !row.isStarred })
.where(and(eq(messages.id, id), eq(messages.receiverId, userId)))
}
export const getUnreadMessageCount = cache(async (userId: string): Promise<number> => {
const [row] = await db
.select({ value: count() })
.from(messages)
.where(and(eq(messages.receiverId, userId), eq(messages.isRead, false), isNull(messages.receiverDeletedAt)))
return Number(row?.value ?? 0)
})
export const getRecipients = cache(
async (userId: string, scope: DataScope): Promise<RecipientOption[]> => {
if (scope.type === "all") {
const all = await db.select({ id: users.id, name: users.name, email: users.email }).from(users)
return all.filter((r) => r.id !== userId).map((r) => ({ ...r, name: r.name ?? r.email }))
}
if (scope.type === "class_taught" && scope.classIds.length > 0) {
// 通过 classes data-access 获取学生 ID避免直接 JOIN classEnrollments 表
const studentIds = await getStudentIdsByClassIds(scope.classIds)
const userMap = await getUserNamesByIds(studentIds)
return Array.from(userMap.values())
.filter((u) => u.id !== userId)
.map((u) => ({ id: u.id, name: u.name ?? u.email, email: u.email, role: "student" }))
}
if (scope.type === "grade_managed" && scope.gradeIds.length > 0) {
// 通过 classes data-access 获取年级下所有班级,再获取学生 ID
// 避免直接 JOIN classes / classEnrollments 表
const classLists = await Promise.all(scope.gradeIds.map((g) => getClassesByGradeId(g)))
const classIds = classLists.flat().map((c) => c.id)
const studentIds = await getStudentIdsByClassIds(classIds)
const userMap = await getUserNamesByIds(studentIds)
return Array.from(userMap.values())
.filter((u) => u.id !== userId)
.map((u) => ({ id: u.id, name: u.name ?? u.email, email: u.email, role: "student" }))
}
if (scope.type === "class_members" && scope.classIds.length > 0) {
// 学生可以给自己班级的任课教师/班主任发消息
const teacherIds = await getTeacherIdsByClassIds(scope.classIds)
const userMap = await getUserNamesByIds(teacherIds)
return Array.from(userMap.values())
.filter((u) => u.id !== userId)
.map((u) => ({ id: u.id, name: u.name ?? u.email, email: u.email, role: "teacher" }))
}
if (scope.type === "children" && scope.childrenIds.length > 0) {
// 家长可以给孩子的班主任/任课教师发消息
const classIds = await Promise.all(scope.childrenIds.map((id) => getStudentActiveClassId(id)))
const validClassIds = classIds.filter((id): id is string => id !== null)
const teacherIds = await getTeacherIdsByClassIds(validClassIds)
const userMap = await getUserNamesByIds(teacherIds)
return Array.from(userMap.values())
.filter((u) => u.id !== userId)
.map((u) => ({ id: u.id, name: u.name ?? u.email, email: u.email, role: "teacher" }))
}
return []
}
)
/**
* 消息首页编排函数:一次性获取消息列表和通知列表。
* 将原本散落在 page.tsx 中的多模块编排逻辑下沉到 data-access 层,
* 页面层只需调用单一函数,提升可复用性与可测试性。
*/
export async function getMessagesPageData(userId: string): Promise<{
messages: { items: Message[]; total: number; page: number; pageSize: number; totalPages: number }
notifications: { items: import("@/modules/notifications/types").Notification[]; total: number; page: number; pageSize: number; totalPages: number }
}> {
const { getNotifications } = await import("@/modules/notifications/data-access")
const [messagesResult, notificationsResult] = await Promise.all([
getMessages({ userId, type: "all", page: 1, pageSize: 50 }),
getNotifications(userId, { page: 1, pageSize: 20 }),
])
return {
messages: messagesResult,
notifications: notificationsResult,
}
}
/**
* 消息详情页编排函数:获取消息详情,并自动标记为已读(若当前用户为接收方且未读)。
* 使用 next/server 的 after() 实现非阻塞标记,避免阻塞页面渲染。
*/
export async function getMessageDetailPageData(
id: string,
userId: string
): Promise<Message | null> {
const message = await getMessageById(id, userId)
if (!message) return null
// 自动标记已读:仅当当前用户为接收方且消息未读时
if (!message.isRead && message.receiverId === userId) {
await markMessageAsRead(id, userId)
}
return message
}
// ---------------------------------------------------------------------------
// V2-P2-13c: 消息草稿 CRUDmessage_drafts 表)
// ---------------------------------------------------------------------------
const mapDraft = (
r: {
id: string
userId: string
receiverId: string | null
subject: string | null
content: string | null
parentMessageId: string | null
createdAt: Date
updatedAt: Date
},
nameMap: Map<string, string>
): MessageDraft => ({
id: r.id,
userId: r.userId,
receiverId: r.receiverId,
receiverName: r.receiverId ? (nameMap.get(r.receiverId) ?? null) : null,
subject: r.subject,
content: r.content,
parentMessageId: r.parentMessageId,
createdAt: toIsoRequired(r.createdAt),
updatedAt: toIsoRequired(r.updatedAt),
})
export const getMessageDrafts = cache(
async (userId: string): Promise<MessageDraft[]> => {
const rows = await db
.select()
.from(messageDrafts)
.where(eq(messageDrafts.userId, userId))
.orderBy(desc(messageDrafts.updatedAt))
const receiverIds = rows.map((r) => r.receiverId).filter((id): id is string => id !== null)
const nameMap = await resolveUserNames(receiverIds)
return rows.map((r) => mapDraft(r, nameMap))
}
)
export async function createMessageDraft(data: CreateMessageDraftInput): Promise<string> {
const id = createId()
await db.insert(messageDrafts).values({
id,
userId: data.userId,
receiverId: data.receiverId ?? null,
subject: data.subject ?? null,
content: data.content ?? null,
parentMessageId: data.parentMessageId ?? null,
})
return id
}
export async function updateMessageDraft(id: string, userId: string, data: UpdateMessageDraftInput): Promise<void> {
await db
.update(messageDrafts)
.set({
...(data.receiverId !== undefined && { receiverId: data.receiverId }),
...(data.subject !== undefined && { subject: data.subject }),
...(data.content !== undefined && { content: data.content }),
...(data.parentMessageId !== undefined && { parentMessageId: data.parentMessageId }),
})
.where(and(eq(messageDrafts.id, id), eq(messageDrafts.userId, userId)))
}
export async function deleteMessageDraft(id: string, userId: string): Promise<void> {
await db
.delete(messageDrafts)
.where(and(eq(messageDrafts.id, id), eq(messageDrafts.userId, userId)))
}
export async function getMessageDraftById(id: string, userId: string): Promise<MessageDraft | null> {
const [row] = await db
.select()
.from(messageDrafts)
.where(and(eq(messageDrafts.id, id), eq(messageDrafts.userId, userId)))
.limit(1)
if (!row) return null
const nameMap = row.receiverId ? await resolveUserNames([row.receiverId]) : new Map<string, string>()
return mapDraft(row, nameMap)
}