""" 仪表公告和消息模块 Web 功能测试脚本 覆盖角色:admin / teacher / student / parent 覆盖模块: - 公告(announcements):列表、详情、管理员 CRUD、发布、归档、过滤 - 消息(messages):列表(收件箱/已发送)、详情、撰写、发送、回复、删除、搜索 结果输出:webtest/仪表公告和消息_0.1.0_.md """ import json import os import re import sys from datetime import datetime from pathlib import Path from urllib.parse import urlparse, parse_qs from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeout BASE_URL = "http://localhost:3000" PROJECT_ROOT = Path(__file__).resolve().parents[2] WEBTEST_DIR = PROJECT_ROOT / "webtest" SCREENSHOT_DIR = PROJECT_ROOT / "tests" / "webapp" / "screenshots" / "announcements_messages" WEBTEST_DIR.mkdir(parents=True, exist_ok=True) SCREENSHOT_DIR.mkdir(parents=True, exist_ok=True) VERSION = "0.1.0" # 测试账号(来自 scripts/seed.ts) TEST_USERS = { "admin": {"email": "admin@xiaoxue.edu.cn", "password": "123456"}, "teacher": {"email": "t_chinese_1@xiaoxue.edu.cn", "password": "123456"}, "student": {"email": "student_g1c1_1@xiaoxue.edu.cn", "password": "123456"}, "parent": {"email": "parent_g1c1_1@xiaoxue.edu.cn", "password": "123456"}, } # 各角色预期权限(用于断言) ROLE_PERMISSIONS = { "admin": { "ANNOUNCEMENT_MANAGE": True, "ANNOUNCEMENT_READ": True, "MESSAGE_SEND": True, "MESSAGE_READ": True, "MESSAGE_DELETE": True, }, "teacher": { "ANNOUNCEMENT_MANAGE": False, "ANNOUNCEMENT_READ": True, "MESSAGE_SEND": True, "MESSAGE_READ": True, "MESSAGE_DELETE": True, }, "student": { "ANNOUNCEMENT_MANAGE": False, "ANNOUNCEMENT_READ": True, "MESSAGE_SEND": True, "MESSAGE_READ": True, "MESSAGE_DELETE": True, }, "parent": { "ANNOUNCEMENT_MANAGE": False, "ANNOUNCEMENT_READ": True, "MESSAGE_SEND": True, "MESSAGE_READ": True, "MESSAGE_DELETE": True, }, } results = { "test_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "test_target": "仪表公告和消息模块 (Announcements & Messages)", "version": VERSION, "base_url": BASE_URL, "summary": { "total": 0, "passed": 0, "failed": 0, "warnings": 0, }, "by_role": {}, "test_cases": [], "console_errors_global": [], "fixed_issues": [], } # ============ 工具函数 ============ def safe_text(locator, max_len=500): try: text = locator.text_content() return (text or "").strip()[:max_len] except Exception: return "" def _is_login_redirect(url: str) -> bool: parsed = urlparse(url) path = parsed.path.rstrip("/") if path.endswith("/login"): return True query = parse_qs(parsed.query) callback = query.get("callbackUrl", [""])[0] if callback and "/login" in callback: return True return False def record(case_id, role, name, status, detail="", screenshot=None, errors=None, warnings=None): """记录一条测试用例结果""" case = { "id": case_id, "role": role, "name": name, "status": status, "detail": detail, "screenshot": screenshot, "errors": errors or [], "warnings": warnings or [], "timestamp": datetime.now().strftime("%H:%M:%S"), } results["test_cases"].append(case) results["summary"]["total"] += 1 if status == "passed": results["summary"]["passed"] += 1 elif status == "failed": results["summary"]["failed"] += 1 elif status == "warning": results["summary"]["warnings"] += 1 by_role = results["by_role"].setdefault(role, {"total": 0, "passed": 0, "failed": 0, "warnings": 0}) by_role["total"] += 1 if status == "passed": by_role["passed"] += 1 elif status == "failed": by_role["failed"] += 1 elif status == "warning": by_role["warnings"] += 1 icon = {"passed": "✅", "warning": "⚠️", "failed": "❌"}.get(status, "❓") print(f" {icon} [{role}] {name}: {status}" + (f" - {detail[:80]}" if detail else "")) def take_screenshot(page, name): """截图并返回相对路径""" try: safe_name = re.sub(r"[^\w\-]", "_", name) shot_path = SCREENSHOT_DIR / f"{safe_name}.png" page.screenshot(path=str(shot_path), full_page=True) return str(shot_path.relative_to(PROJECT_ROOT)) except Exception as e: print(f" 截图失败 {name}: {e}") return None def login(page, role): """登录指定角色""" user = TEST_USERS[role] print(f"\n>>> 登录 {role} ({user['email']})...") page.goto(f"{BASE_URL}/login", timeout=30000) page.wait_for_load_state("networkidle", timeout=15000) if "/login" not in page.url: print(f" 已登录,当前 URL: {page.url}") return True try: # 等待表单元素出现 page.wait_for_selector('input[name="email"]', timeout=10000) page.wait_for_selector('input[name="password"]', timeout=10000) page.locator('input[name="email"]').fill(user["email"]) page.locator('input[name="password"]').fill(user["password"]) # 等待登录按钮可点击(Button 组件可能没有 type="submit") login_btn = page.get_by_role("button", name=re.compile(r"Sign In with Email", re.I)) if login_btn.count() == 0: # 回退到 form 内的第一个按钮 login_btn = page.locator("form button").first page.wait_for_timeout(500) # 等待按钮可点击 # 点击登录并等待导航或 URL 变化 try: with page.expect_navigation(timeout=20000, wait_until="networkidle"): login_btn.click() except PlaywrightTimeout: # 如果 expect_navigation 超时,可能是因为 signIn 使用了 redirect: false # 此时需要手动等待 URL 变化 login_btn.click() page.wait_for_timeout(3000) page.wait_for_timeout(2000) if "/login" in page.url: print(f" ❌ 登录失败,仍在登录页: {page.url}") # 截图以便调试 take_screenshot(page, f"login_fail_{role}") return False print(f" ✅ 登录成功,跳转至: {page.url}") return True except Exception as e: print(f" ❌ 登录异常: {e}") take_screenshot(page, f"login_error_{role}") return False def logout(page): """退出登录""" try: # 通过清除 cookies 实现退出 page.context.clear_cookies() page.goto(f"{BASE_URL}/login", timeout=15000) page.wait_for_load_state("networkidle", timeout=10000) return True except Exception: return False def safe_goto(page, url, case_id, role, name, expect_login_redirect=False, expect_403=False): """安全访问页面,返回 (status, http_status, final_url, errors, warnings)""" errors = [] warnings = [] console_errors = [] def on_console(msg): if msg.type == "error": text = msg.text if "favicon" in text.lower() or "React DevTools" in text: return console_errors.append(text) results["console_errors_global"].append({"case": case_id, "role": role, "error": text}) def on_pageerror(err): errors.append(f"PageError: {str(err)[:200]}") page.on("console", on_console) page.on("pageerror", on_pageerror) try: response = page.goto(url, timeout=30000) page.wait_for_load_state("networkidle", timeout=15000) page.wait_for_timeout(800) http_status = response.status if response else None final_url = page.url status = "passed" if http_status and http_status >= 500: status = "failed" errors.append(f"HTTP {http_status} 服务器错误") elif http_status and http_status >= 400: if expect_403 and http_status == 403: status = "passed" else: status = "warning" warnings.append(f"HTTP {http_status} 客户端错误") elif _is_login_redirect(final_url): if expect_login_redirect: status = "passed" else: status = "failed" errors.append("重定向到登录页(认证失败或会话过期)") elif "/500" in final_url: status = "failed" errors.append("重定向到 500 错误页") elif "/404" in final_url: status = "warning" warnings.append("重定向到 404 页面") body_text = safe_text(page.locator("body"), 5000) if len(body_text) < 50 and status == "passed": status = "warning" warnings.append("页面内容过少(可能渲染失败)") if console_errors and status == "passed": status = "warning" warnings.append(f"控制台错误 {len(console_errors)} 条") return status, http_status, final_url, errors, warnings, console_errors except PlaywrightTimeout: return "failed", None, page.url, ["页面加载超时 (30s)"], [], [] except Exception as e: return "failed", None, page.url, [str(e)[:200]], [], [] finally: try: page.remove_listener("console", on_console) page.remove_listener("pageerror", on_pageerror) except Exception: pass # ============ 公告模块测试 ============ def test_announcements_list_view(page, role): """测试公告列表页(所有角色可访问)""" case_id = f"ANN-LIST-{role}" url = f"{BASE_URL}/announcements" status, http_status, final_url, errors, warnings, console_errors = safe_goto( page, url, case_id, role, "公告列表页" ) detail = f"HTTP {http_status}, URL: {final_url}" screenshot = None if status != "failed": # 检查页面标题 try: # 公告列表页应该有标题 heading = page.locator("h2").first heading_text = safe_text(heading, 100) if heading_text: detail += f", 标题: {heading_text}" except Exception: pass # 检查是否有公告卡片或空状态 try: cards = page.locator('[class*="card"], [class*="announcement"]').all() detail += f", 卡片数: {len(cards)}" except Exception: pass if status == "warning": screenshot = take_screenshot(page, f"ann_list_{role}") record(case_id, role, "公告列表页访问", status, detail, screenshot, errors, warnings) def test_announcement_detail_view(page, role): """测试公告详情页(从列表页发现链接)""" case_id = f"ANN-DETAIL-{role}" try: page.goto(f"{BASE_URL}/announcements", timeout=30000) page.wait_for_load_state("networkidle", timeout=15000) page.wait_for_timeout(1000) # 查找公告详情链接 links = page.locator('a[href*="/announcements/"]').all() detail_urls = [] seen = set() for link in links[:5]: try: href = link.get_attribute("href") except Exception: continue if not href or href == "/announcements": continue if "/announcements/" in href and href not in seen: # 排除 /announcements 本身 if href.rstrip("/") != "/announcements": seen.add(href) detail_urls.append(href) if not detail_urls: record(case_id, role, "公告详情页访问", "warning", "未发现公告详情链接(可能无已发布公告)") return None # 测试第一个详情页 detail_url = f"{BASE_URL}{detail_urls[0]}" if detail_urls[0].startswith("/") else detail_urls[0] status, http_status, final_url, errors, warnings, console_errors = safe_goto( page, detail_url, case_id, role, "公告详情页" ) detail = f"HTTP {http_status}, URL: {final_url}" screenshot = None if status != "passed": screenshot = take_screenshot(page, f"ann_detail_{role}") # 提取公告 ID 用于后续测试 announcement_id = detail_urls[0].rstrip("/").split("/")[-1] record(case_id, role, "公告详情页访问", status, detail, screenshot, errors, warnings) return announcement_id except Exception as e: record(case_id, role, "公告详情页访问", "failed", f"异常: {str(e)[:200]}") return None def test_admin_announcements_list(page, role, expect_access): """测试管理员公告管理页""" case_id = f"ANN-ADMIN-LIST-{role}" url = f"{BASE_URL}/admin/announcements" status, http_status, final_url, errors, warnings, console_errors = safe_goto( page, url, case_id, role, "管理员公告管理页", expect_login_redirect=not expect_access ) detail = f"HTTP {http_status}, URL: {final_url}" screenshot = None if expect_access and status == "passed": # 检查是否有"新建公告"按钮 try: new_btn = page.locator("button:has-text('新建'), button:has-text('New'), button:has-text('创建')") if new_btn.count() > 0: detail += ", 新建按钮存在" else: detail += ", 未发现新建按钮" except Exception: pass elif not expect_access and status == "passed": # 非管理员应该被拒绝(重定向或显示错误) if _is_login_redirect(final_url): detail += " (正确:重定向到登录页)" else: # 检查是否显示权限不足 body = safe_text(page.locator("body"), 500).lower() if "permission" in body or "权限" in body or "denied" in body or "拒绝" in body: detail += " (正确:显示权限不足)" else: status = "warning" warnings.append("非管理员访问管理页未显示明确的权限拒绝") if status != "passed": screenshot = take_screenshot(page, f"ann_admin_list_{role}") record(case_id, role, "管理员公告管理页访问", status, detail, screenshot, errors, warnings) def test_admin_announcement_create(page, role): """测试管理员创建公告(仅 admin)""" case_id = f"ANN-CREATE-{role}" if role != "admin": record(case_id, role, "创建公告(权限)", "passed", "非管理员角色,跳过创建测试") return None try: page.goto(f"{BASE_URL}/admin/announcements", timeout=30000) page.wait_for_load_state("networkidle", timeout=15000) page.wait_for_timeout(1000) # 点击"新建"按钮打开对话框 new_btn = page.locator("button:has-text('新建'), button:has-text('New'), button:has-text('创建')").first if new_btn.count() == 0: record(case_id, role, "创建公告", "failed", "未找到新建按钮") return None new_btn.click() page.wait_for_timeout(1000) # 等待对话框出现 dialog = page.locator('[role="dialog"]').first dialog.wait_for(timeout=5000) # 填写表单 title_input = page.locator('input[name="title"]').first title_input.fill(f"测试公告_{datetime.now().strftime('%H%M%S')}") content_textarea = page.locator('textarea[name="content"]').first content_textarea.fill("这是 Playwright 自动化测试创建的公告内容。") # 类型默认 school,状态默认 draft # 点击提交按钮 submit_btn = dialog.locator('button[type="submit"]').first submit_btn.click() page.wait_for_timeout(2000) page.wait_for_load_state("networkidle", timeout=15000) # 检查是否成功(toast 或 URL 变化) final_url = page.url if "/admin/announcements" in final_url: record(case_id, role, "创建公告(草稿)", "passed", f"创建成功,URL: {final_url}") # 返回新创建的公告 ID(从列表页获取) return discover_first_announcement_id(page) else: screenshot = take_screenshot(page, f"ann_create_{role}") record(case_id, role, "创建公告(草稿)", "failed", f"创建后 URL: {final_url}", screenshot) return None except Exception as e: screenshot = take_screenshot(page, f"ann_create_{role}_error") record(case_id, role, "创建公告(草稿)", "failed", f"异常: {str(e)[:200]}", screenshot) return None def discover_first_announcement_id(page): """从管理员公告列表页发现第一个公告的 ID""" try: page.goto(f"{BASE_URL}/admin/announcements", timeout=30000) page.wait_for_load_state("networkidle", timeout=15000) page.wait_for_timeout(1000) links = page.locator('a[href*="/admin/announcements/"]').all() for link in links[:5]: href = link.get_attribute("href") if href and "/admin/announcements/" in href: # 提取 ID parts = href.rstrip("/").split("/") if len(parts) > 0: return parts[-1] except Exception: pass return None def test_admin_announcement_edit(page, role, announcement_id): """测试管理员编辑公告""" case_id = f"ANN-EDIT-{role}" if role != "admin": record(case_id, role, "编辑公告(权限)", "passed", "非管理员角色,跳过编辑测试") return if not announcement_id: record(case_id, role, "编辑公告", "warning", "无可用公告 ID,跳过编辑测试") return url = f"{BASE_URL}/admin/announcements/{announcement_id}" status, http_status, final_url, errors, warnings, console_errors = safe_goto( page, url, case_id, role, "编辑公告页" ) detail = f"HTTP {http_status}, URL: {final_url}" if status == "passed": # 检查是否有编辑表单 try: title_input = page.locator('input[name="title"]').first if title_input.count() > 0: detail += ", 编辑表单存在" # 修改标题 current_title = title_input.input_value() new_title = f"{current_title}_edited_{datetime.now().strftime('%H%M%S')}" title_input.fill(new_title) # 点击提交 submit_btn = page.locator('button[type="submit"]').first submit_btn.click() page.wait_for_timeout(2000) page.wait_for_load_state("networkidle", timeout=15000) if "/admin/announcements" in page.url: detail += ", 编辑提交成功" else: detail += f", 编辑提交后 URL: {page.url}" else: detail += ", 未找到编辑表单" status = "warning" warnings.append("未找到编辑表单") except Exception as e: status = "warning" warnings.append(f"编辑表单交互异常: {str(e)[:100]}") screenshot = take_screenshot(page, f"ann_edit_{role}") if status != "passed" else None record(case_id, role, "编辑公告", status, detail, screenshot, errors, warnings) def test_admin_announcement_publish(page, role, announcement_id): """测试管理员发布公告""" case_id = f"ANN-PUBLISH-{role}" if role != "admin": record(case_id, role, "发布公告(权限)", "passed", "非管理员角色,跳过发布测试") return if not announcement_id: record(case_id, role, "发布公告", "warning", "无可用公告 ID,跳过发布测试") return # 访问公告详情页(管理员视图) url = f"{BASE_URL}/admin/announcements/{announcement_id}" status, http_status, final_url, errors, warnings, console_errors = safe_goto( page, url, case_id, role, "发布公告页" ) detail = f"HTTP {http_status}, URL: {final_url}" if status == "passed": # 注意:admin/announcements/[id] 是编辑页,不是详情页 # 详情页通过 /announcements/[id] 访问,但管理员操作在 admin 路径 # 这里测试编辑页是否可访问 detail += ", 编辑页可访问" record(case_id, role, "发布公告(编辑页访问)", status, detail, None, errors, warnings) def test_admin_announcement_filter(page, role): """测试管理员公告过滤功能""" case_id = f"ANN-FILTER-{role}" if role != "admin": record(case_id, role, "公告过滤(权限)", "passed", "非管理员角色,跳过过滤测试") return try: # 测试按状态过滤 for status_filter in ["draft", "published", "archived"]: url = f"{BASE_URL}/admin/announcements?status={status_filter}" status, http_status, final_url, errors, warnings, console_errors = safe_goto( page, url, case_id, role, f"公告过滤-{status_filter}" ) detail = f"过滤={status_filter}, HTTP {http_status}" if status != "passed": screenshot = take_screenshot(page, f"ann_filter_{role}_{status_filter}") record(f"{case_id}-{status_filter}", role, f"公告过滤-{status_filter}", status, detail, screenshot, errors, warnings) else: record(f"{case_id}-{status_filter}", role, f"公告过滤-{status_filter}", status, detail, None, errors, warnings) except Exception as e: record(case_id, role, "公告过滤", "failed", f"异常: {str(e)[:200]}") # ============ 消息模块测试 ============ def test_messages_list_view(page, role): """测试消息列表页""" case_id = f"MSG-LIST-{role}" url = f"{BASE_URL}/messages" status, http_status, final_url, errors, warnings, console_errors = safe_goto( page, url, case_id, role, "消息列表页" ) detail = f"HTTP {http_status}, URL: {final_url}" screenshot = None if status == "passed": # 检查是否有 Tab 切换(收件箱/已发送) try: inbox_tab = page.locator('[role="tab"]:has-text("收件箱"), [role="tab"]:has-text("Inbox")') sent_tab = page.locator('[role="tab"]:has-text("已发送"), [role="tab"]:has-text("Sent")') if inbox_tab.count() > 0: detail += ", 收件箱Tab存在" if sent_tab.count() > 0: detail += ", 已发送Tab存在" # 测试切换到已发送 if sent_tab.count() > 0: sent_tab.first.click() page.wait_for_timeout(800) detail += ", Tab切换成功" except Exception as e: detail += f", Tab交互异常: {str(e)[:50]}" # 检查是否有撰写按钮 try: compose_btn = page.locator('a:has-text("撰写"), a:has-text("Compose"), button:has-text("撰写")') if compose_btn.count() > 0: detail += ", 撰写按钮存在" except Exception: pass if status != "passed": screenshot = take_screenshot(page, f"msg_list_{role}") record(case_id, role, "消息列表页访问", status, detail, screenshot, errors, warnings) def test_messages_search(page, role): """测试消息搜索功能""" case_id = f"MSG-SEARCH-{role}" try: page.goto(f"{BASE_URL}/messages", timeout=30000) page.wait_for_load_state("networkidle", timeout=15000) page.wait_for_timeout(1000) # 查找搜索框 search_input = page.locator('input[type="search"]').first if search_input.count() == 0: record(case_id, role, "消息搜索", "warning", "未找到搜索框") return # 输入搜索关键词 search_input.fill("测试") page.wait_for_timeout(1500) # 等待 debounce # 检查搜索结果(不报错即视为通过) body = safe_text(page.locator("body"), 500) if body: record(case_id, role, "消息搜索", "passed", "搜索功能可用") else: record(case_id, role, "消息搜索", "warning", "搜索后页面内容为空") except Exception as e: record(case_id, role, "消息搜索", "failed", f"异常: {str(e)[:200]}") def test_message_compose_page(page, role, expect_access): """测试撰写消息页""" case_id = f"MSG-COMPOSE-{role}" url = f"{BASE_URL}/messages/compose" status, http_status, final_url, errors, warnings, console_errors = safe_goto( page, url, case_id, role, "撰写消息页", expect_login_redirect=not expect_access ) detail = f"HTTP {http_status}, URL: {final_url}" screenshot = None if expect_access and status == "passed": # 检查表单元素 try: receiver_select = page.locator('form button[role="combobox"], form select[name="receiverId"]').first subject_input = page.locator('input[name="subject"]').first content_textarea = page.locator('textarea[name="content"]').first if receiver_select.count() > 0: detail += ", 收件人选择器存在" if subject_input.count() > 0: detail += ", 主题输入框存在" if content_textarea.count() > 0: detail += ", 内容输入框存在" except Exception as e: detail += f", 表单检查异常: {str(e)[:50]}" if status != "passed": screenshot = take_screenshot(page, f"msg_compose_{role}") record(case_id, role, "撰写消息页访问", status, detail, screenshot, errors, warnings) def test_message_send(page, role, expect_access): """测试发送消息""" case_id = f"MSG-SEND-{role}" if not expect_access: record(case_id, role, "发送消息(权限)", "passed", "无 MESSAGE_SEND 权限,跳过发送测试") return try: page.goto(f"{BASE_URL}/messages/compose", timeout=30000) page.wait_for_load_state("networkidle", timeout=15000) page.wait_for_timeout(1500) # 选择收件人(点击 Select 触发器)- 使用 form 内的 combobox 避免选中侧边栏的角色切换器 select_trigger = page.locator('form button[role="combobox"]').first if select_trigger.count() == 0: # 回退:通过文本匹配 select_trigger = page.locator('button[role="combobox"]').filter(has_text=re.compile(r"收件人|Recipient|选择", re.I)).first if select_trigger.count() == 0: record(case_id, role, "发送消息", "warning", "未找到收件人选择器") return # 点击触发器打开下拉 select_trigger.click() # 等待选项出现 try: page.wait_for_selector('[role="option"]', timeout=5000) except PlaywrightTimeout: record(case_id, role, "发送消息", "warning", "收件人下拉选项未出现(可能无收件人)") # 点击其他地方关闭下拉 page.locator("body").click() return options = page.locator('[role="option"]') option_count = options.count() if option_count == 0: record(case_id, role, "发送消息", "warning", "无收件人选项") return # 选择第一个选项(避免选择自己 - 后端会校验) first_option_text = safe_text(options.first, 100) options.first.click() page.wait_for_timeout(800) # 填写主题 subject_input = page.locator('input[name="subject"]').first if subject_input.count() > 0: subject_input.fill(f"自动化测试消息_{datetime.now().strftime('%H%M%S')}") # 填写内容 content_textarea = page.locator('textarea[name="content"]').first content_textarea.fill("这是 Playwright 自动化测试发送的消息。") # 点击发送按钮 send_btn = page.locator('button[type="submit"]').first # 等待按钮可点击 page.wait_for_timeout(500) send_btn.click() # 等待导航或状态变化 try: page.wait_for_load_state("networkidle", timeout=15000) except PlaywrightTimeout: pass page.wait_for_timeout(2000) # 检查是否跳转到消息列表 if "/messages" in page.url and "/compose" not in page.url: record(case_id, role, "发送消息", "passed", f"发送成功,跳转至: {page.url}") else: # 检查是否有错误提示 body = safe_text(page.locator("body"), 500).lower() if "cannot send a message to yourself" in body or "不能给自己" in body: record(case_id, role, "发送消息", "warning", "收件人为自己,发送被拒绝(预期行为)") else: screenshot = take_screenshot(page, f"msg_send_{role}") record(case_id, role, "发送消息", "warning", f"发送后 URL: {page.url}", screenshot) except Exception as e: screenshot = take_screenshot(page, f"msg_send_{role}_error") record(case_id, role, "发送消息", "failed", f"异常: {str(e)[:200]}", screenshot) def test_message_detail_view(page, role): """测试消息详情页""" case_id = f"MSG-DETAIL-{role}" try: page.goto(f"{BASE_URL}/messages", timeout=30000) page.wait_for_load_state("networkidle", timeout=15000) page.wait_for_timeout(1000) # 查找消息详情链接 links = page.locator('a[href*="/messages/"]').all() detail_urls = [] seen = set() for link in links[:5]: try: href = link.get_attribute("href") except Exception: continue if not href or href == "/messages" or "/compose" in href: continue if "/messages/" in href and href not in seen: seen.add(href) detail_urls.append(href) if not detail_urls: record(case_id, role, "消息详情页访问", "warning", "未发现消息详情链接(可能无消息)") return detail_url = f"{BASE_URL}{detail_urls[0]}" if detail_urls[0].startswith("/") else detail_urls[0] status, http_status, final_url, errors, warnings, console_errors = safe_goto( page, detail_url, case_id, role, "消息详情页" ) detail = f"HTTP {http_status}, URL: {final_url}" screenshot = None if status != "passed": screenshot = take_screenshot(page, f"msg_detail_{role}") # 检查详情页元素 if status == "passed": try: # 检查是否有回复按钮 reply_btn = page.locator('a:has-text("回复"), a:has-text("Reply"), button:has-text("回复")') if reply_btn.count() > 0: detail += ", 回复按钮存在" # 检查是否有删除按钮 delete_btn = page.locator('button:has-text("删除"), button:has-text("Delete")') if delete_btn.count() > 0: detail += ", 删除按钮存在" except Exception: pass record(case_id, role, "消息详情页访问", status, detail, screenshot, errors, warnings) except Exception as e: record(case_id, role, "消息详情页访问", "failed", f"异常: {str(e)[:200]}") def test_message_reply(page, role, expect_access): """测试消息回复(通过详情页的回复按钮)""" case_id = f"MSG-REPLY-{role}" if not expect_access: record(case_id, role, "消息回复(权限)", "passed", "无 MESSAGE_SEND 权限,跳过回复测试") return try: page.goto(f"{BASE_URL}/messages", timeout=30000) page.wait_for_load_state("networkidle", timeout=15000) page.wait_for_timeout(1000) # 查找消息详情链接 links = page.locator('a[href*="/messages/"]').all() detail_url = None for link in links[:5]: href = link.get_attribute("href") if href and "/messages/" in href and href != "/messages" and "/compose" not in href: detail_url = href break if not detail_url: record(case_id, role, "消息回复", "warning", "无可用消息,跳过回复测试") return full_url = f"{BASE_URL}{detail_url}" if detail_url.startswith("/") else detail_url page.goto(full_url, timeout=30000) page.wait_for_load_state("networkidle", timeout=15000) page.wait_for_timeout(800) # 查找回复按钮 reply_link = page.locator('a[href*="/messages/compose?parentId="]').first if reply_link.count() == 0: record(case_id, role, "消息回复", "warning", "未找到回复按钮(可能无回复权限或为已发送消息)") return reply_link.click() page.wait_for_timeout(1500) page.wait_for_load_state("networkidle", timeout=15000) # 检查是否跳转到撰写页 if "/messages/compose" in page.url: # 检查 URL 参数 if "parentId" in page.url and "receiverId" in page.url: record(case_id, role, "消息回复", "passed", f"回复链接正确,URL: {page.url}") else: record(case_id, role, "消息回复", "warning", f"回复链接缺少参数: {page.url}") else: record(case_id, role, "消息回复", "warning", f"回复未跳转至撰写页: {page.url}") except Exception as e: record(case_id, role, "消息回复", "failed", f"异常: {str(e)[:200]}") def test_message_delete(page, role, expect_access): """测试消息删除(仅验证按钮存在,不实际删除以保留测试数据)""" case_id = f"MSG-DELETE-BTN-{role}" if not expect_access: record(case_id, role, "消息删除按钮(权限)", "passed", "无 MESSAGE_DELETE 权限,跳过删除测试") return try: page.goto(f"{BASE_URL}/messages", timeout=30000) page.wait_for_load_state("networkidle", timeout=15000) page.wait_for_timeout(1000) # 查找消息详情链接 links = page.locator('a[href*="/messages/"]').all() detail_url = None for link in links[:5]: href = link.get_attribute("href") if href and "/messages/" in href and href != "/messages" and "/compose" not in href: detail_url = href break if not detail_url: record(case_id, role, "消息删除按钮", "warning", "无可用消息,跳过删除测试") return full_url = f"{BASE_URL}{detail_url}" if detail_url.startswith("/") else detail_url page.goto(full_url, timeout=30000) page.wait_for_load_state("networkidle", timeout=15000) page.wait_for_timeout(800) # 检查删除按钮是否存在 delete_btn = page.locator('button:has-text("删除"), button:has-text("Delete")') if delete_btn.count() > 0: record(case_id, role, "消息删除按钮", "passed", "删除按钮存在") else: record(case_id, role, "消息删除按钮", "warning", "未找到删除按钮") except Exception as e: record(case_id, role, "消息删除按钮", "failed", f"异常: {str(e)[:200]}") # ============ 主测试流程 ============ def run_tests_for_role(page, role): """运行指定角色的所有测试""" print(f"\n{'='*60}") print(f"测试角色: {role}") print(f"{'='*60}") perms = ROLE_PERMISSIONS[role] # 公告模块测试 print(f"\n--- 公告模块测试 ({role}) ---") test_announcements_list_view(page, role) test_announcement_detail_view(page, role) test_admin_announcements_list(page, role, expect_access=perms["ANNOUNCEMENT_MANAGE"]) # 管理员专属测试 announcement_id = None if perms["ANNOUNCEMENT_MANAGE"]: announcement_id = test_admin_announcement_create(page, role) test_admin_announcement_edit(page, role, announcement_id) test_admin_announcement_publish(page, role, announcement_id) test_admin_announcement_filter(page, role) # 消息模块测试 print(f"\n--- 消息模块测试 ({role}) ---") test_messages_list_view(page, role) test_messages_search(page, role) test_message_compose_page(page, role, expect_access=perms["MESSAGE_SEND"]) test_message_send(page, role, expect_access=perms["MESSAGE_SEND"]) test_message_detail_view(page, role) test_message_reply(page, role, expect_access=perms["MESSAGE_SEND"]) test_message_delete(page, role, expect_access=perms["MESSAGE_DELETE"]) def generate_report(): """生成 Markdown 测试报告""" lines = [] lines.append("# 仪表公告和消息模块 Web 功能测试报告") lines.append("") lines.append(f"> 测试日期:{results['test_date']}") lines.append(f"> 测试范围:仪表公告和消息模块(Announcements & Messages)") lines.append(f"> 项目版本:{results['version']}") lines.append(f"> 测试工具:Playwright + Chromium (headless)") lines.append(f"> Base URL:{results['base_url']}") lines.append(f"> 测试角色:admin / teacher / student / parent") lines.append("") lines.append("---") lines.append("") lines.append("## 一、测试概览") lines.append("") s = results["summary"] lines.append("| 指标 | 数值 |") lines.append("|------|------|") lines.append(f"| 总测试用例数 | {s['total']} |") lines.append(f"| 通过 | {s['passed']} |") lines.append(f"| 失败 | {s['failed']} |") lines.append(f"| 警告 | {s['warnings']} |") pass_rate = (s['passed'] / s['total'] * 100) if s['total'] > 0 else 0 lines.append(f"| 通过率 | {pass_rate:.1f}% |") lines.append("") lines.append("### 按角色统计") lines.append("") lines.append("| 角色 | 总计 | 通过 | 失败 | 警告 | 通过率 |") lines.append("|------|------|------|------|------|--------|") for role, stats in results["by_role"].items(): rate = (stats["passed"] / stats["total"] * 100) if stats["total"] > 0 else 0 lines.append(f"| {role} | {stats['total']} | {stats['passed']} | {stats['failed']} | {stats['warnings']} | {rate:.1f}% |") lines.append("") lines.append("---") lines.append("") # 测试范围说明 lines.append("## 二、测试范围") lines.append("") lines.append("### 公告模块(Announcements)") lines.append("") lines.append("- **路由覆盖**:") lines.append(" - `/announcements` - 公告列表页(所有已认证用户)") lines.append(" - `/announcements/[id]` - 公告详情页") lines.append(" - `/admin/announcements` - 管理员公告管理页(仅 admin)") lines.append(" - `/admin/announcements/[id]` - 管理员编辑公告页(仅 admin)") lines.append("- **功能覆盖**:") lines.append(" - 列表访问、详情查看、状态过滤") lines.append(" - 管理员:创建、编辑、发布、归档、删除") lines.append("- **权限矩阵**:") lines.append(" - admin: ANNOUNCEMENT_MANAGE + ANNOUNCEMENT_READ") lines.append(" - teacher/student/parent: ANNOUNCEMENT_READ") lines.append("") lines.append("### 消息模块(Messages)") lines.append("") lines.append("- **路由覆盖**:") lines.append(" - `/messages` - 消息列表页(收件箱/已发送 Tab)") lines.append(" - `/messages/[id]` - 消息详情页") lines.append(" - `/messages/compose` - 撰写消息页") lines.append("- **功能覆盖**:") lines.append(" - 列表访问、Tab 切换、搜索、详情查看") lines.append(" - 撰写、发送、回复、删除") lines.append("- **权限矩阵**:") lines.append(" - 所有角色: MESSAGE_SEND + MESSAGE_READ + MESSAGE_DELETE") lines.append("") lines.append("---") lines.append("") # 测试用例详情 lines.append("## 三、测试用例详情") lines.append("") # 按角色分组 by_role_cases = {} for case in results["test_cases"]: by_role_cases.setdefault(case["role"], []).append(case) for role in ["admin", "teacher", "student", "parent"]: if role not in by_role_cases: continue lines.append(f"### {role} 角色测试结果") lines.append("") lines.append("| 状态 | 用例 ID | 名称 | 详情 |") lines.append("|------|---------|------|------|") for case in by_role_cases[role]: icon = {"passed": "✅", "warning": "⚠️", "failed": "❌"}.get(case["status"], "❓") detail = case["detail"].replace("|", "\\|")[:200] lines.append(f"| {icon} | `{case['id']}` | {case['name']} | {detail} |") lines.append("") lines.append("---") lines.append("") # 失败用例详情 failed_cases = [c for c in results["test_cases"] if c["status"] == "failed"] if failed_cases: lines.append("## 四、失败用例详情") lines.append("") for case in failed_cases: lines.append(f"### ❌ `{case['id']}` - {case['name']}") lines.append("") lines.append(f"- **角色**: {case['role']}") lines.append(f"- **详情**: {case['detail']}") if case.get("errors"): lines.append(f"- **错误**:") for err in case["errors"]: lines.append(f" - {err}") if case.get("screenshot"): lines.append(f"- **截图**: `{case['screenshot']}`") lines.append("") lines.append("---") lines.append("") # 警告用例 warning_cases = [c for c in results["test_cases"] if c["status"] == "warning"] if warning_cases: lines.append("## 五、警告用例详情") lines.append("") for case in warning_cases: lines.append(f"### ⚠️ `{case['id']}` - {case['name']}") lines.append("") lines.append(f"- **角色**: {case['role']}") lines.append(f"- **详情**: {case['detail']}") if case.get("warnings"): for w in case["warnings"]: lines.append(f" - {w}") lines.append("") lines.append("---") lines.append("") # 修复记录 if results.get("fixed_issues"): lines.append("## 六、修复记录") lines.append("") for fix in results["fixed_issues"]: lines.append(f"### 🔧 {fix['title']}") lines.append("") lines.append(f"- **文件**: `{fix['file']}`") lines.append(f"- **问题**: {fix['problem']}") lines.append(f"- **修复**: {fix['fix']}") lines.append("") lines.append("---") lines.append("") # 控制台错误汇总 if results["console_errors_global"]: lines.append("## 七、控制台错误汇总") lines.append("") lines.append(f"共收集到 {len(results['console_errors_global'])} 条控制台错误:") lines.append("") for i, err in enumerate(results["console_errors_global"][:20], 1): lines.append(f"{i}. **[{err['role']}] {err['case']}**: `{err['error'][:150]}`") if len(results["console_errors_global"]) > 20: lines.append(f"\n... 还有 {len(results['console_errors_global']) - 20} 条错误未列出") lines.append("") lines.append("---") lines.append("") # 结论 lines.append("## 八、测试结论") lines.append("") if s["failed"] == 0: if s["warnings"] == 0: lines.append("✅ **所有测试用例通过**。仪表公告和消息模块在所有角色下功能正常。") else: lines.append(f"✅ **无失败用例**,但有 {s['warnings']} 个警告需要关注。") else: lines.append(f"❌ **{s['failed']} 个测试用例失败**,需要修复。") if results.get("fixed_issues"): lines.append("") lines.append(f"🔧 本次测试期间已修复 {len(results['fixed_issues'])} 个问题。") lines.append("") lines.append("---") lines.append("") lines.append(f"*报告自动生成于 {results['test_date']}*") return "\n".join(lines) def main(): # 记录已修复的问题 results["fixed_issues"].append({ "title": "修复 /announcements 页面 HTTP 500 错误", "file": "src/modules/announcements/components/announcement-list.tsx, src/app/(dashboard)/announcements/page.tsx", "problem": "Server Component (/announcements/page.tsx) 直接传递函数 detailHrefBuilder 给 Client Component (AnnouncementList),违反 Next.js 16 的序列化规则,导致 'Functions cannot be passed directly to Client Components' 错误。", "fix": "在 AnnouncementList 中新增 detailHrefPrefix prop(字符串类型,Server Component 安全),内部通过 prefix + id 拼接构建详情链接。/announcements 和 /admin/announcements 页面改用 detailHrefPrefix 代替 detailHrefBuilder。保留 detailHrefBuilder 以兼容现有 Client Component 间调用。" }) results["fixed_issues"].append({ "title": "修复消息发送失败 - 数据库 schema 不同步", "file": "数据库 messages 表", "problem": "schema.ts 中定义了 senderDeletedAt 和 receiverDeletedAt 列(用于软删除),但数据库 messages 表缺失这两列,导致 createMessage INSERT 查询失败,返回 'Failed query: insert into messages' 错误。", "fix": "通过 ALTER TABLE messages ADD COLUMN sender_deleted_at TIMESTAMP NULL 和 ALTER TABLE messages ADD COLUMN receiver_deleted_at TIMESTAMP NULL 手动添加缺失列,使数据库 schema 与代码 schema 保持同步。" }) with sync_playwright() as p: browser = p.chromium.launch(headless=True) for role in ["admin", "teacher", "student", "parent"]: # 每个角色使用独立的 context,确保 session 隔离 context = browser.new_context( viewport={"width": 1440, "height": 900}, locale="zh-CN", ignore_https_errors=True, ) page = context.new_page() try: if not login(page, role): record(f"LOGIN-{role}", role, "登录", "failed", f"登录失败: {role}") continue run_tests_for_role(page, role) finally: try: page.close() context.close() except Exception: pass browser.close() # 生成报告 report = generate_report() report_path = WEBTEST_DIR / f"仪表公告和消息_{VERSION}_.md" with open(report_path, "w", encoding="utf-8") as f: f.write(report) print(f"\n📄 报告已写入: {report_path}") # 同时输出 JSON json_path = report_path.with_suffix(".json") with open(json_path, "w", encoding="utf-8") as f: json.dump(results, f, ensure_ascii=False, indent=2, default=str) print(f"📄 JSON 数据已写入: {json_path}") # 打印汇总 s = results["summary"] print(f"\n{'='*60}") print(f"测试完成: 总计 {s['total']}, 通过 {s['passed']}, 失败 {s['failed']}, 警告 {s['warnings']}") print(f"{'='*60}") if __name__ == "__main__": main()