Files
NextEdu/tests/webapp/announcements_messages_test.py
SpecialX d884c6d513
Some checks failed
CI / scheduled-backup (push) Failing after 36s
CI / backup-verify (push) Has been skipped
CI / weekly-dr-drill (push) Failing after 0s
CI / build-deploy (push) Has been cancelled
CI / security-scan (push) Has been cancelled
test: update and add E2E, integration, visual, and webapp tests
- Update E2E tests: announcements, auth, auth-business-flow, full-route-regression, grades, navigation, smoke-auth, teacher-web-test

- Update integration tests: api-ai-chat, api-onboarding-complete, api-onboarding-status, proxy-guard, integration setup

- Update visual regression tests: admin-dashboard, homepage, student-dashboard, teacher-dashboard, visual config, helpers

- Update webapp tests: admin, parent, student full tests and debug scripts

- Add new webapp tests: announcements_messages, settings_profile, debug scripts

- Add webtest directory with test plans, screenshots, and diagnostic scripts
2026-06-23 17:39:40 +08:00

1233 lines
47 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
仪表公告和消息模块 Web 功能测试脚本
覆盖角色admin / teacher / student / parent
覆盖模块:
- 公告announcements列表、详情、管理员 CRUD、发布、归档、过滤
- 消息messages列表收件箱/已发送)、详情、撰写、发送、回复、删除、搜索
结果输出webtest/仪表公告和消息_0.1.0_.md
"""
import json
import os
import re
import sys
from datetime import datetime
from pathlib import Path
from urllib.parse import urlparse, parse_qs
from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeout
BASE_URL = "http://localhost:3000"
PROJECT_ROOT = Path(__file__).resolve().parents[2]
WEBTEST_DIR = PROJECT_ROOT / "webtest"
SCREENSHOT_DIR = PROJECT_ROOT / "tests" / "webapp" / "screenshots" / "announcements_messages"
WEBTEST_DIR.mkdir(parents=True, exist_ok=True)
SCREENSHOT_DIR.mkdir(parents=True, exist_ok=True)
VERSION = "0.1.0"
# 测试账号(来自 scripts/seed.ts
TEST_USERS = {
"admin": {"email": "admin@xiaoxue.edu.cn", "password": "123456"},
"teacher": {"email": "t_chinese_1@xiaoxue.edu.cn", "password": "123456"},
"student": {"email": "student_g1c1_1@xiaoxue.edu.cn", "password": "123456"},
"parent": {"email": "parent_g1c1_1@xiaoxue.edu.cn", "password": "123456"},
}
# 各角色预期权限(用于断言)
ROLE_PERMISSIONS = {
"admin": {
"ANNOUNCEMENT_MANAGE": True,
"ANNOUNCEMENT_READ": True,
"MESSAGE_SEND": True,
"MESSAGE_READ": True,
"MESSAGE_DELETE": True,
},
"teacher": {
"ANNOUNCEMENT_MANAGE": False,
"ANNOUNCEMENT_READ": True,
"MESSAGE_SEND": True,
"MESSAGE_READ": True,
"MESSAGE_DELETE": True,
},
"student": {
"ANNOUNCEMENT_MANAGE": False,
"ANNOUNCEMENT_READ": True,
"MESSAGE_SEND": True,
"MESSAGE_READ": True,
"MESSAGE_DELETE": True,
},
"parent": {
"ANNOUNCEMENT_MANAGE": False,
"ANNOUNCEMENT_READ": True,
"MESSAGE_SEND": True,
"MESSAGE_READ": True,
"MESSAGE_DELETE": True,
},
}
results = {
"test_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"test_target": "仪表公告和消息模块 (Announcements & Messages)",
"version": VERSION,
"base_url": BASE_URL,
"summary": {
"total": 0,
"passed": 0,
"failed": 0,
"warnings": 0,
},
"by_role": {},
"test_cases": [],
"console_errors_global": [],
"fixed_issues": [],
}
# ============ 工具函数 ============
def safe_text(locator, max_len=500):
try:
text = locator.text_content()
return (text or "").strip()[:max_len]
except Exception:
return ""
def _is_login_redirect(url: str) -> bool:
parsed = urlparse(url)
path = parsed.path.rstrip("/")
if path.endswith("/login"):
return True
query = parse_qs(parsed.query)
callback = query.get("callbackUrl", [""])[0]
if callback and "/login" in callback:
return True
return False
def record(case_id, role, name, status, detail="", screenshot=None, errors=None, warnings=None):
"""记录一条测试用例结果"""
case = {
"id": case_id,
"role": role,
"name": name,
"status": status,
"detail": detail,
"screenshot": screenshot,
"errors": errors or [],
"warnings": warnings or [],
"timestamp": datetime.now().strftime("%H:%M:%S"),
}
results["test_cases"].append(case)
results["summary"]["total"] += 1
if status == "passed":
results["summary"]["passed"] += 1
elif status == "failed":
results["summary"]["failed"] += 1
elif status == "warning":
results["summary"]["warnings"] += 1
by_role = results["by_role"].setdefault(role, {"total": 0, "passed": 0, "failed": 0, "warnings": 0})
by_role["total"] += 1
if status == "passed":
by_role["passed"] += 1
elif status == "failed":
by_role["failed"] += 1
elif status == "warning":
by_role["warnings"] += 1
icon = {"passed": "", "warning": "⚠️", "failed": ""}.get(status, "")
print(f" {icon} [{role}] {name}: {status}" + (f" - {detail[:80]}" if detail else ""))
def take_screenshot(page, name):
"""截图并返回相对路径"""
try:
safe_name = re.sub(r"[^\w\-]", "_", name)
shot_path = SCREENSHOT_DIR / f"{safe_name}.png"
page.screenshot(path=str(shot_path), full_page=True)
return str(shot_path.relative_to(PROJECT_ROOT))
except Exception as e:
print(f" 截图失败 {name}: {e}")
return None
def login(page, role):
"""登录指定角色"""
user = TEST_USERS[role]
print(f"\n>>> 登录 {role} ({user['email']})...")
page.goto(f"{BASE_URL}/login", timeout=30000)
page.wait_for_load_state("networkidle", timeout=15000)
if "/login" not in page.url:
print(f" 已登录,当前 URL: {page.url}")
return True
try:
# 等待表单元素出现
page.wait_for_selector('input[name="email"]', timeout=10000)
page.wait_for_selector('input[name="password"]', timeout=10000)
page.locator('input[name="email"]').fill(user["email"])
page.locator('input[name="password"]').fill(user["password"])
# 等待登录按钮可点击Button 组件可能没有 type="submit"
login_btn = page.get_by_role("button", name=re.compile(r"Sign In with Email", re.I))
if login_btn.count() == 0:
# 回退到 form 内的第一个按钮
login_btn = page.locator("form button").first
page.wait_for_timeout(500) # 等待按钮可点击
# 点击登录并等待导航或 URL 变化
try:
with page.expect_navigation(timeout=20000, wait_until="networkidle"):
login_btn.click()
except PlaywrightTimeout:
# 如果 expect_navigation 超时,可能是因为 signIn 使用了 redirect: false
# 此时需要手动等待 URL 变化
login_btn.click()
page.wait_for_timeout(3000)
page.wait_for_timeout(2000)
if "/login" in page.url:
print(f" ❌ 登录失败,仍在登录页: {page.url}")
# 截图以便调试
take_screenshot(page, f"login_fail_{role}")
return False
print(f" ✅ 登录成功,跳转至: {page.url}")
return True
except Exception as e:
print(f" ❌ 登录异常: {e}")
take_screenshot(page, f"login_error_{role}")
return False
def logout(page):
"""退出登录"""
try:
# 通过清除 cookies 实现退出
page.context.clear_cookies()
page.goto(f"{BASE_URL}/login", timeout=15000)
page.wait_for_load_state("networkidle", timeout=10000)
return True
except Exception:
return False
def safe_goto(page, url, case_id, role, name, expect_login_redirect=False, expect_403=False):
"""安全访问页面,返回 (status, http_status, final_url, errors, warnings)"""
errors = []
warnings = []
console_errors = []
def on_console(msg):
if msg.type == "error":
text = msg.text
if "favicon" in text.lower() or "React DevTools" in text:
return
console_errors.append(text)
results["console_errors_global"].append({"case": case_id, "role": role, "error": text})
def on_pageerror(err):
errors.append(f"PageError: {str(err)[:200]}")
page.on("console", on_console)
page.on("pageerror", on_pageerror)
try:
response = page.goto(url, timeout=30000)
page.wait_for_load_state("networkidle", timeout=15000)
page.wait_for_timeout(800)
http_status = response.status if response else None
final_url = page.url
status = "passed"
if http_status and http_status >= 500:
status = "failed"
errors.append(f"HTTP {http_status} 服务器错误")
elif http_status and http_status >= 400:
if expect_403 and http_status == 403:
status = "passed"
else:
status = "warning"
warnings.append(f"HTTP {http_status} 客户端错误")
elif _is_login_redirect(final_url):
if expect_login_redirect:
status = "passed"
else:
status = "failed"
errors.append("重定向到登录页(认证失败或会话过期)")
elif "/500" in final_url:
status = "failed"
errors.append("重定向到 500 错误页")
elif "/404" in final_url:
status = "warning"
warnings.append("重定向到 404 页面")
body_text = safe_text(page.locator("body"), 5000)
if len(body_text) < 50 and status == "passed":
status = "warning"
warnings.append("页面内容过少(可能渲染失败)")
if console_errors and status == "passed":
status = "warning"
warnings.append(f"控制台错误 {len(console_errors)}")
return status, http_status, final_url, errors, warnings, console_errors
except PlaywrightTimeout:
return "failed", None, page.url, ["页面加载超时 (30s)"], [], []
except Exception as e:
return "failed", None, page.url, [str(e)[:200]], [], []
finally:
try:
page.remove_listener("console", on_console)
page.remove_listener("pageerror", on_pageerror)
except Exception:
pass
# ============ 公告模块测试 ============
def test_announcements_list_view(page, role):
"""测试公告列表页(所有角色可访问)"""
case_id = f"ANN-LIST-{role}"
url = f"{BASE_URL}/announcements"
status, http_status, final_url, errors, warnings, console_errors = safe_goto(
page, url, case_id, role, "公告列表页"
)
detail = f"HTTP {http_status}, URL: {final_url}"
screenshot = None
if status != "failed":
# 检查页面标题
try:
# 公告列表页应该有标题
heading = page.locator("h2").first
heading_text = safe_text(heading, 100)
if heading_text:
detail += f", 标题: {heading_text}"
except Exception:
pass
# 检查是否有公告卡片或空状态
try:
cards = page.locator('[class*="card"], [class*="announcement"]').all()
detail += f", 卡片数: {len(cards)}"
except Exception:
pass
if status == "warning":
screenshot = take_screenshot(page, f"ann_list_{role}")
record(case_id, role, "公告列表页访问", status, detail, screenshot, errors, warnings)
def test_announcement_detail_view(page, role):
"""测试公告详情页(从列表页发现链接)"""
case_id = f"ANN-DETAIL-{role}"
try:
page.goto(f"{BASE_URL}/announcements", timeout=30000)
page.wait_for_load_state("networkidle", timeout=15000)
page.wait_for_timeout(1000)
# 查找公告详情链接
links = page.locator('a[href*="/announcements/"]').all()
detail_urls = []
seen = set()
for link in links[:5]:
try:
href = link.get_attribute("href")
except Exception:
continue
if not href or href == "/announcements":
continue
if "/announcements/" in href and href not in seen:
# 排除 /announcements 本身
if href.rstrip("/") != "/announcements":
seen.add(href)
detail_urls.append(href)
if not detail_urls:
record(case_id, role, "公告详情页访问", "warning", "未发现公告详情链接(可能无已发布公告)")
return None
# 测试第一个详情页
detail_url = f"{BASE_URL}{detail_urls[0]}" if detail_urls[0].startswith("/") else detail_urls[0]
status, http_status, final_url, errors, warnings, console_errors = safe_goto(
page, detail_url, case_id, role, "公告详情页"
)
detail = f"HTTP {http_status}, URL: {final_url}"
screenshot = None
if status != "passed":
screenshot = take_screenshot(page, f"ann_detail_{role}")
# 提取公告 ID 用于后续测试
announcement_id = detail_urls[0].rstrip("/").split("/")[-1]
record(case_id, role, "公告详情页访问", status, detail, screenshot, errors, warnings)
return announcement_id
except Exception as e:
record(case_id, role, "公告详情页访问", "failed", f"异常: {str(e)[:200]}")
return None
def test_admin_announcements_list(page, role, expect_access):
"""测试管理员公告管理页"""
case_id = f"ANN-ADMIN-LIST-{role}"
url = f"{BASE_URL}/admin/announcements"
status, http_status, final_url, errors, warnings, console_errors = safe_goto(
page, url, case_id, role, "管理员公告管理页",
expect_login_redirect=not expect_access
)
detail = f"HTTP {http_status}, URL: {final_url}"
screenshot = None
if expect_access and status == "passed":
# 检查是否有"新建公告"按钮
try:
new_btn = page.locator("button:has-text('新建'), button:has-text('New'), button:has-text('创建')")
if new_btn.count() > 0:
detail += ", 新建按钮存在"
else:
detail += ", 未发现新建按钮"
except Exception:
pass
elif not expect_access and status == "passed":
# 非管理员应该被拒绝(重定向或显示错误)
if _is_login_redirect(final_url):
detail += " (正确:重定向到登录页)"
else:
# 检查是否显示权限不足
body = safe_text(page.locator("body"), 500).lower()
if "permission" in body or "权限" in body or "denied" in body or "拒绝" in body:
detail += " (正确:显示权限不足)"
else:
status = "warning"
warnings.append("非管理员访问管理页未显示明确的权限拒绝")
if status != "passed":
screenshot = take_screenshot(page, f"ann_admin_list_{role}")
record(case_id, role, "管理员公告管理页访问", status, detail, screenshot, errors, warnings)
def test_admin_announcement_create(page, role):
"""测试管理员创建公告(仅 admin"""
case_id = f"ANN-CREATE-{role}"
if role != "admin":
record(case_id, role, "创建公告(权限)", "passed", "非管理员角色,跳过创建测试")
return None
try:
page.goto(f"{BASE_URL}/admin/announcements", timeout=30000)
page.wait_for_load_state("networkidle", timeout=15000)
page.wait_for_timeout(1000)
# 点击"新建"按钮打开对话框
new_btn = page.locator("button:has-text('新建'), button:has-text('New'), button:has-text('创建')").first
if new_btn.count() == 0:
record(case_id, role, "创建公告", "failed", "未找到新建按钮")
return None
new_btn.click()
page.wait_for_timeout(1000)
# 等待对话框出现
dialog = page.locator('[role="dialog"]').first
dialog.wait_for(timeout=5000)
# 填写表单
title_input = page.locator('input[name="title"]').first
title_input.fill(f"测试公告_{datetime.now().strftime('%H%M%S')}")
content_textarea = page.locator('textarea[name="content"]').first
content_textarea.fill("这是 Playwright 自动化测试创建的公告内容。")
# 类型默认 school状态默认 draft
# 点击提交按钮
submit_btn = dialog.locator('button[type="submit"]').first
submit_btn.click()
page.wait_for_timeout(2000)
page.wait_for_load_state("networkidle", timeout=15000)
# 检查是否成功toast 或 URL 变化)
final_url = page.url
if "/admin/announcements" in final_url:
record(case_id, role, "创建公告(草稿)", "passed", f"创建成功URL: {final_url}")
# 返回新创建的公告 ID从列表页获取
return discover_first_announcement_id(page)
else:
screenshot = take_screenshot(page, f"ann_create_{role}")
record(case_id, role, "创建公告(草稿)", "failed", f"创建后 URL: {final_url}", screenshot)
return None
except Exception as e:
screenshot = take_screenshot(page, f"ann_create_{role}_error")
record(case_id, role, "创建公告(草稿)", "failed", f"异常: {str(e)[:200]}", screenshot)
return None
def discover_first_announcement_id(page):
"""从管理员公告列表页发现第一个公告的 ID"""
try:
page.goto(f"{BASE_URL}/admin/announcements", timeout=30000)
page.wait_for_load_state("networkidle", timeout=15000)
page.wait_for_timeout(1000)
links = page.locator('a[href*="/admin/announcements/"]').all()
for link in links[:5]:
href = link.get_attribute("href")
if href and "/admin/announcements/" in href:
# 提取 ID
parts = href.rstrip("/").split("/")
if len(parts) > 0:
return parts[-1]
except Exception:
pass
return None
def test_admin_announcement_edit(page, role, announcement_id):
"""测试管理员编辑公告"""
case_id = f"ANN-EDIT-{role}"
if role != "admin":
record(case_id, role, "编辑公告(权限)", "passed", "非管理员角色,跳过编辑测试")
return
if not announcement_id:
record(case_id, role, "编辑公告", "warning", "无可用公告 ID跳过编辑测试")
return
url = f"{BASE_URL}/admin/announcements/{announcement_id}"
status, http_status, final_url, errors, warnings, console_errors = safe_goto(
page, url, case_id, role, "编辑公告页"
)
detail = f"HTTP {http_status}, URL: {final_url}"
if status == "passed":
# 检查是否有编辑表单
try:
title_input = page.locator('input[name="title"]').first
if title_input.count() > 0:
detail += ", 编辑表单存在"
# 修改标题
current_title = title_input.input_value()
new_title = f"{current_title}_edited_{datetime.now().strftime('%H%M%S')}"
title_input.fill(new_title)
# 点击提交
submit_btn = page.locator('button[type="submit"]').first
submit_btn.click()
page.wait_for_timeout(2000)
page.wait_for_load_state("networkidle", timeout=15000)
if "/admin/announcements" in page.url:
detail += ", 编辑提交成功"
else:
detail += f", 编辑提交后 URL: {page.url}"
else:
detail += ", 未找到编辑表单"
status = "warning"
warnings.append("未找到编辑表单")
except Exception as e:
status = "warning"
warnings.append(f"编辑表单交互异常: {str(e)[:100]}")
screenshot = take_screenshot(page, f"ann_edit_{role}") if status != "passed" else None
record(case_id, role, "编辑公告", status, detail, screenshot, errors, warnings)
def test_admin_announcement_publish(page, role, announcement_id):
"""测试管理员发布公告"""
case_id = f"ANN-PUBLISH-{role}"
if role != "admin":
record(case_id, role, "发布公告(权限)", "passed", "非管理员角色,跳过发布测试")
return
if not announcement_id:
record(case_id, role, "发布公告", "warning", "无可用公告 ID跳过发布测试")
return
# 访问公告详情页(管理员视图)
url = f"{BASE_URL}/admin/announcements/{announcement_id}"
status, http_status, final_url, errors, warnings, console_errors = safe_goto(
page, url, case_id, role, "发布公告页"
)
detail = f"HTTP {http_status}, URL: {final_url}"
if status == "passed":
# 注意admin/announcements/[id] 是编辑页,不是详情页
# 详情页通过 /announcements/[id] 访问,但管理员操作在 admin 路径
# 这里测试编辑页是否可访问
detail += ", 编辑页可访问"
record(case_id, role, "发布公告(编辑页访问)", status, detail, None, errors, warnings)
def test_admin_announcement_filter(page, role):
"""测试管理员公告过滤功能"""
case_id = f"ANN-FILTER-{role}"
if role != "admin":
record(case_id, role, "公告过滤(权限)", "passed", "非管理员角色,跳过过滤测试")
return
try:
# 测试按状态过滤
for status_filter in ["draft", "published", "archived"]:
url = f"{BASE_URL}/admin/announcements?status={status_filter}"
status, http_status, final_url, errors, warnings, console_errors = safe_goto(
page, url, case_id, role, f"公告过滤-{status_filter}"
)
detail = f"过滤={status_filter}, HTTP {http_status}"
if status != "passed":
screenshot = take_screenshot(page, f"ann_filter_{role}_{status_filter}")
record(f"{case_id}-{status_filter}", role, f"公告过滤-{status_filter}", status, detail, screenshot, errors, warnings)
else:
record(f"{case_id}-{status_filter}", role, f"公告过滤-{status_filter}", status, detail, None, errors, warnings)
except Exception as e:
record(case_id, role, "公告过滤", "failed", f"异常: {str(e)[:200]}")
# ============ 消息模块测试 ============
def test_messages_list_view(page, role):
"""测试消息列表页"""
case_id = f"MSG-LIST-{role}"
url = f"{BASE_URL}/messages"
status, http_status, final_url, errors, warnings, console_errors = safe_goto(
page, url, case_id, role, "消息列表页"
)
detail = f"HTTP {http_status}, URL: {final_url}"
screenshot = None
if status == "passed":
# 检查是否有 Tab 切换(收件箱/已发送)
try:
inbox_tab = page.locator('[role="tab"]:has-text("收件箱"), [role="tab"]:has-text("Inbox")')
sent_tab = page.locator('[role="tab"]:has-text("已发送"), [role="tab"]:has-text("Sent")')
if inbox_tab.count() > 0:
detail += ", 收件箱Tab存在"
if sent_tab.count() > 0:
detail += ", 已发送Tab存在"
# 测试切换到已发送
if sent_tab.count() > 0:
sent_tab.first.click()
page.wait_for_timeout(800)
detail += ", Tab切换成功"
except Exception as e:
detail += f", Tab交互异常: {str(e)[:50]}"
# 检查是否有撰写按钮
try:
compose_btn = page.locator('a:has-text("撰写"), a:has-text("Compose"), button:has-text("撰写")')
if compose_btn.count() > 0:
detail += ", 撰写按钮存在"
except Exception:
pass
if status != "passed":
screenshot = take_screenshot(page, f"msg_list_{role}")
record(case_id, role, "消息列表页访问", status, detail, screenshot, errors, warnings)
def test_messages_search(page, role):
"""测试消息搜索功能"""
case_id = f"MSG-SEARCH-{role}"
try:
page.goto(f"{BASE_URL}/messages", timeout=30000)
page.wait_for_load_state("networkidle", timeout=15000)
page.wait_for_timeout(1000)
# 查找搜索框
search_input = page.locator('input[type="search"]').first
if search_input.count() == 0:
record(case_id, role, "消息搜索", "warning", "未找到搜索框")
return
# 输入搜索关键词
search_input.fill("测试")
page.wait_for_timeout(1500) # 等待 debounce
# 检查搜索结果(不报错即视为通过)
body = safe_text(page.locator("body"), 500)
if body:
record(case_id, role, "消息搜索", "passed", "搜索功能可用")
else:
record(case_id, role, "消息搜索", "warning", "搜索后页面内容为空")
except Exception as e:
record(case_id, role, "消息搜索", "failed", f"异常: {str(e)[:200]}")
def test_message_compose_page(page, role, expect_access):
"""测试撰写消息页"""
case_id = f"MSG-COMPOSE-{role}"
url = f"{BASE_URL}/messages/compose"
status, http_status, final_url, errors, warnings, console_errors = safe_goto(
page, url, case_id, role, "撰写消息页",
expect_login_redirect=not expect_access
)
detail = f"HTTP {http_status}, URL: {final_url}"
screenshot = None
if expect_access and status == "passed":
# 检查表单元素
try:
receiver_select = page.locator('form button[role="combobox"], form select[name="receiverId"]').first
subject_input = page.locator('input[name="subject"]').first
content_textarea = page.locator('textarea[name="content"]').first
if receiver_select.count() > 0:
detail += ", 收件人选择器存在"
if subject_input.count() > 0:
detail += ", 主题输入框存在"
if content_textarea.count() > 0:
detail += ", 内容输入框存在"
except Exception as e:
detail += f", 表单检查异常: {str(e)[:50]}"
if status != "passed":
screenshot = take_screenshot(page, f"msg_compose_{role}")
record(case_id, role, "撰写消息页访问", status, detail, screenshot, errors, warnings)
def test_message_send(page, role, expect_access):
"""测试发送消息"""
case_id = f"MSG-SEND-{role}"
if not expect_access:
record(case_id, role, "发送消息(权限)", "passed", "无 MESSAGE_SEND 权限,跳过发送测试")
return
try:
page.goto(f"{BASE_URL}/messages/compose", timeout=30000)
page.wait_for_load_state("networkidle", timeout=15000)
page.wait_for_timeout(1500)
# 选择收件人(点击 Select 触发器)- 使用 form 内的 combobox 避免选中侧边栏的角色切换器
select_trigger = page.locator('form button[role="combobox"]').first
if select_trigger.count() == 0:
# 回退:通过文本匹配
select_trigger = page.locator('button[role="combobox"]').filter(has_text=re.compile(r"收件人|Recipient|选择", re.I)).first
if select_trigger.count() == 0:
record(case_id, role, "发送消息", "warning", "未找到收件人选择器")
return
# 点击触发器打开下拉
select_trigger.click()
# 等待选项出现
try:
page.wait_for_selector('[role="option"]', timeout=5000)
except PlaywrightTimeout:
record(case_id, role, "发送消息", "warning", "收件人下拉选项未出现(可能无收件人)")
# 点击其他地方关闭下拉
page.locator("body").click()
return
options = page.locator('[role="option"]')
option_count = options.count()
if option_count == 0:
record(case_id, role, "发送消息", "warning", "无收件人选项")
return
# 选择第一个选项(避免选择自己 - 后端会校验)
first_option_text = safe_text(options.first, 100)
options.first.click()
page.wait_for_timeout(800)
# 填写主题
subject_input = page.locator('input[name="subject"]').first
if subject_input.count() > 0:
subject_input.fill(f"自动化测试消息_{datetime.now().strftime('%H%M%S')}")
# 填写内容
content_textarea = page.locator('textarea[name="content"]').first
content_textarea.fill("这是 Playwright 自动化测试发送的消息。")
# 点击发送按钮
send_btn = page.locator('button[type="submit"]').first
# 等待按钮可点击
page.wait_for_timeout(500)
send_btn.click()
# 等待导航或状态变化
try:
page.wait_for_load_state("networkidle", timeout=15000)
except PlaywrightTimeout:
pass
page.wait_for_timeout(2000)
# 检查是否跳转到消息列表
if "/messages" in page.url and "/compose" not in page.url:
record(case_id, role, "发送消息", "passed", f"发送成功,跳转至: {page.url}")
else:
# 检查是否有错误提示
body = safe_text(page.locator("body"), 500).lower()
if "cannot send a message to yourself" in body or "不能给自己" in body:
record(case_id, role, "发送消息", "warning", "收件人为自己,发送被拒绝(预期行为)")
else:
screenshot = take_screenshot(page, f"msg_send_{role}")
record(case_id, role, "发送消息", "warning", f"发送后 URL: {page.url}", screenshot)
except Exception as e:
screenshot = take_screenshot(page, f"msg_send_{role}_error")
record(case_id, role, "发送消息", "failed", f"异常: {str(e)[:200]}", screenshot)
def test_message_detail_view(page, role):
"""测试消息详情页"""
case_id = f"MSG-DETAIL-{role}"
try:
page.goto(f"{BASE_URL}/messages", timeout=30000)
page.wait_for_load_state("networkidle", timeout=15000)
page.wait_for_timeout(1000)
# 查找消息详情链接
links = page.locator('a[href*="/messages/"]').all()
detail_urls = []
seen = set()
for link in links[:5]:
try:
href = link.get_attribute("href")
except Exception:
continue
if not href or href == "/messages" or "/compose" in href:
continue
if "/messages/" in href and href not in seen:
seen.add(href)
detail_urls.append(href)
if not detail_urls:
record(case_id, role, "消息详情页访问", "warning", "未发现消息详情链接(可能无消息)")
return
detail_url = f"{BASE_URL}{detail_urls[0]}" if detail_urls[0].startswith("/") else detail_urls[0]
status, http_status, final_url, errors, warnings, console_errors = safe_goto(
page, detail_url, case_id, role, "消息详情页"
)
detail = f"HTTP {http_status}, URL: {final_url}"
screenshot = None
if status != "passed":
screenshot = take_screenshot(page, f"msg_detail_{role}")
# 检查详情页元素
if status == "passed":
try:
# 检查是否有回复按钮
reply_btn = page.locator('a:has-text("回复"), a:has-text("Reply"), button:has-text("回复")')
if reply_btn.count() > 0:
detail += ", 回复按钮存在"
# 检查是否有删除按钮
delete_btn = page.locator('button:has-text("删除"), button:has-text("Delete")')
if delete_btn.count() > 0:
detail += ", 删除按钮存在"
except Exception:
pass
record(case_id, role, "消息详情页访问", status, detail, screenshot, errors, warnings)
except Exception as e:
record(case_id, role, "消息详情页访问", "failed", f"异常: {str(e)[:200]}")
def test_message_reply(page, role, expect_access):
"""测试消息回复(通过详情页的回复按钮)"""
case_id = f"MSG-REPLY-{role}"
if not expect_access:
record(case_id, role, "消息回复(权限)", "passed", "无 MESSAGE_SEND 权限,跳过回复测试")
return
try:
page.goto(f"{BASE_URL}/messages", timeout=30000)
page.wait_for_load_state("networkidle", timeout=15000)
page.wait_for_timeout(1000)
# 查找消息详情链接
links = page.locator('a[href*="/messages/"]').all()
detail_url = None
for link in links[:5]:
href = link.get_attribute("href")
if href and "/messages/" in href and href != "/messages" and "/compose" not in href:
detail_url = href
break
if not detail_url:
record(case_id, role, "消息回复", "warning", "无可用消息,跳过回复测试")
return
full_url = f"{BASE_URL}{detail_url}" if detail_url.startswith("/") else detail_url
page.goto(full_url, timeout=30000)
page.wait_for_load_state("networkidle", timeout=15000)
page.wait_for_timeout(800)
# 查找回复按钮
reply_link = page.locator('a[href*="/messages/compose?parentId="]').first
if reply_link.count() == 0:
record(case_id, role, "消息回复", "warning", "未找到回复按钮(可能无回复权限或为已发送消息)")
return
reply_link.click()
page.wait_for_timeout(1500)
page.wait_for_load_state("networkidle", timeout=15000)
# 检查是否跳转到撰写页
if "/messages/compose" in page.url:
# 检查 URL 参数
if "parentId" in page.url and "receiverId" in page.url:
record(case_id, role, "消息回复", "passed", f"回复链接正确URL: {page.url}")
else:
record(case_id, role, "消息回复", "warning", f"回复链接缺少参数: {page.url}")
else:
record(case_id, role, "消息回复", "warning", f"回复未跳转至撰写页: {page.url}")
except Exception as e:
record(case_id, role, "消息回复", "failed", f"异常: {str(e)[:200]}")
def test_message_delete(page, role, expect_access):
"""测试消息删除(仅验证按钮存在,不实际删除以保留测试数据)"""
case_id = f"MSG-DELETE-BTN-{role}"
if not expect_access:
record(case_id, role, "消息删除按钮(权限)", "passed", "无 MESSAGE_DELETE 权限,跳过删除测试")
return
try:
page.goto(f"{BASE_URL}/messages", timeout=30000)
page.wait_for_load_state("networkidle", timeout=15000)
page.wait_for_timeout(1000)
# 查找消息详情链接
links = page.locator('a[href*="/messages/"]').all()
detail_url = None
for link in links[:5]:
href = link.get_attribute("href")
if href and "/messages/" in href and href != "/messages" and "/compose" not in href:
detail_url = href
break
if not detail_url:
record(case_id, role, "消息删除按钮", "warning", "无可用消息,跳过删除测试")
return
full_url = f"{BASE_URL}{detail_url}" if detail_url.startswith("/") else detail_url
page.goto(full_url, timeout=30000)
page.wait_for_load_state("networkidle", timeout=15000)
page.wait_for_timeout(800)
# 检查删除按钮是否存在
delete_btn = page.locator('button:has-text("删除"), button:has-text("Delete")')
if delete_btn.count() > 0:
record(case_id, role, "消息删除按钮", "passed", "删除按钮存在")
else:
record(case_id, role, "消息删除按钮", "warning", "未找到删除按钮")
except Exception as e:
record(case_id, role, "消息删除按钮", "failed", f"异常: {str(e)[:200]}")
# ============ 主测试流程 ============
def run_tests_for_role(page, role):
"""运行指定角色的所有测试"""
print(f"\n{'='*60}")
print(f"测试角色: {role}")
print(f"{'='*60}")
perms = ROLE_PERMISSIONS[role]
# 公告模块测试
print(f"\n--- 公告模块测试 ({role}) ---")
test_announcements_list_view(page, role)
test_announcement_detail_view(page, role)
test_admin_announcements_list(page, role, expect_access=perms["ANNOUNCEMENT_MANAGE"])
# 管理员专属测试
announcement_id = None
if perms["ANNOUNCEMENT_MANAGE"]:
announcement_id = test_admin_announcement_create(page, role)
test_admin_announcement_edit(page, role, announcement_id)
test_admin_announcement_publish(page, role, announcement_id)
test_admin_announcement_filter(page, role)
# 消息模块测试
print(f"\n--- 消息模块测试 ({role}) ---")
test_messages_list_view(page, role)
test_messages_search(page, role)
test_message_compose_page(page, role, expect_access=perms["MESSAGE_SEND"])
test_message_send(page, role, expect_access=perms["MESSAGE_SEND"])
test_message_detail_view(page, role)
test_message_reply(page, role, expect_access=perms["MESSAGE_SEND"])
test_message_delete(page, role, expect_access=perms["MESSAGE_DELETE"])
def generate_report():
"""生成 Markdown 测试报告"""
lines = []
lines.append("# 仪表公告和消息模块 Web 功能测试报告")
lines.append("")
lines.append(f"> 测试日期:{results['test_date']}")
lines.append(f"> 测试范围仪表公告和消息模块Announcements & Messages")
lines.append(f"> 项目版本:{results['version']}")
lines.append(f"> 测试工具Playwright + Chromium (headless)")
lines.append(f"> Base URL{results['base_url']}")
lines.append(f"> 测试角色admin / teacher / student / parent")
lines.append("")
lines.append("---")
lines.append("")
lines.append("## 一、测试概览")
lines.append("")
s = results["summary"]
lines.append("| 指标 | 数值 |")
lines.append("|------|------|")
lines.append(f"| 总测试用例数 | {s['total']} |")
lines.append(f"| 通过 | {s['passed']} |")
lines.append(f"| 失败 | {s['failed']} |")
lines.append(f"| 警告 | {s['warnings']} |")
pass_rate = (s['passed'] / s['total'] * 100) if s['total'] > 0 else 0
lines.append(f"| 通过率 | {pass_rate:.1f}% |")
lines.append("")
lines.append("### 按角色统计")
lines.append("")
lines.append("| 角色 | 总计 | 通过 | 失败 | 警告 | 通过率 |")
lines.append("|------|------|------|------|------|--------|")
for role, stats in results["by_role"].items():
rate = (stats["passed"] / stats["total"] * 100) if stats["total"] > 0 else 0
lines.append(f"| {role} | {stats['total']} | {stats['passed']} | {stats['failed']} | {stats['warnings']} | {rate:.1f}% |")
lines.append("")
lines.append("---")
lines.append("")
# 测试范围说明
lines.append("## 二、测试范围")
lines.append("")
lines.append("### 公告模块Announcements")
lines.append("")
lines.append("- **路由覆盖**")
lines.append(" - `/announcements` - 公告列表页(所有已认证用户)")
lines.append(" - `/announcements/[id]` - 公告详情页")
lines.append(" - `/admin/announcements` - 管理员公告管理页(仅 admin")
lines.append(" - `/admin/announcements/[id]` - 管理员编辑公告页(仅 admin")
lines.append("- **功能覆盖**")
lines.append(" - 列表访问、详情查看、状态过滤")
lines.append(" - 管理员:创建、编辑、发布、归档、删除")
lines.append("- **权限矩阵**")
lines.append(" - admin: ANNOUNCEMENT_MANAGE + ANNOUNCEMENT_READ")
lines.append(" - teacher/student/parent: ANNOUNCEMENT_READ")
lines.append("")
lines.append("### 消息模块Messages")
lines.append("")
lines.append("- **路由覆盖**")
lines.append(" - `/messages` - 消息列表页(收件箱/已发送 Tab")
lines.append(" - `/messages/[id]` - 消息详情页")
lines.append(" - `/messages/compose` - 撰写消息页")
lines.append("- **功能覆盖**")
lines.append(" - 列表访问、Tab 切换、搜索、详情查看")
lines.append(" - 撰写、发送、回复、删除")
lines.append("- **权限矩阵**")
lines.append(" - 所有角色: MESSAGE_SEND + MESSAGE_READ + MESSAGE_DELETE")
lines.append("")
lines.append("---")
lines.append("")
# 测试用例详情
lines.append("## 三、测试用例详情")
lines.append("")
# 按角色分组
by_role_cases = {}
for case in results["test_cases"]:
by_role_cases.setdefault(case["role"], []).append(case)
for role in ["admin", "teacher", "student", "parent"]:
if role not in by_role_cases:
continue
lines.append(f"### {role} 角色测试结果")
lines.append("")
lines.append("| 状态 | 用例 ID | 名称 | 详情 |")
lines.append("|------|---------|------|------|")
for case in by_role_cases[role]:
icon = {"passed": "", "warning": "⚠️", "failed": ""}.get(case["status"], "")
detail = case["detail"].replace("|", "\\|")[:200]
lines.append(f"| {icon} | `{case['id']}` | {case['name']} | {detail} |")
lines.append("")
lines.append("---")
lines.append("")
# 失败用例详情
failed_cases = [c for c in results["test_cases"] if c["status"] == "failed"]
if failed_cases:
lines.append("## 四、失败用例详情")
lines.append("")
for case in failed_cases:
lines.append(f"### ❌ `{case['id']}` - {case['name']}")
lines.append("")
lines.append(f"- **角色**: {case['role']}")
lines.append(f"- **详情**: {case['detail']}")
if case.get("errors"):
lines.append(f"- **错误**:")
for err in case["errors"]:
lines.append(f" - {err}")
if case.get("screenshot"):
lines.append(f"- **截图**: `{case['screenshot']}`")
lines.append("")
lines.append("---")
lines.append("")
# 警告用例
warning_cases = [c for c in results["test_cases"] if c["status"] == "warning"]
if warning_cases:
lines.append("## 五、警告用例详情")
lines.append("")
for case in warning_cases:
lines.append(f"### ⚠️ `{case['id']}` - {case['name']}")
lines.append("")
lines.append(f"- **角色**: {case['role']}")
lines.append(f"- **详情**: {case['detail']}")
if case.get("warnings"):
for w in case["warnings"]:
lines.append(f" - {w}")
lines.append("")
lines.append("---")
lines.append("")
# 修复记录
if results.get("fixed_issues"):
lines.append("## 六、修复记录")
lines.append("")
for fix in results["fixed_issues"]:
lines.append(f"### 🔧 {fix['title']}")
lines.append("")
lines.append(f"- **文件**: `{fix['file']}`")
lines.append(f"- **问题**: {fix['problem']}")
lines.append(f"- **修复**: {fix['fix']}")
lines.append("")
lines.append("---")
lines.append("")
# 控制台错误汇总
if results["console_errors_global"]:
lines.append("## 七、控制台错误汇总")
lines.append("")
lines.append(f"共收集到 {len(results['console_errors_global'])} 条控制台错误:")
lines.append("")
for i, err in enumerate(results["console_errors_global"][:20], 1):
lines.append(f"{i}. **[{err['role']}] {err['case']}**: `{err['error'][:150]}`")
if len(results["console_errors_global"]) > 20:
lines.append(f"\n... 还有 {len(results['console_errors_global']) - 20} 条错误未列出")
lines.append("")
lines.append("---")
lines.append("")
# 结论
lines.append("## 八、测试结论")
lines.append("")
if s["failed"] == 0:
if s["warnings"] == 0:
lines.append("✅ **所有测试用例通过**。仪表公告和消息模块在所有角色下功能正常。")
else:
lines.append(f"✅ **无失败用例**,但有 {s['warnings']} 个警告需要关注。")
else:
lines.append(f"❌ **{s['failed']} 个测试用例失败**,需要修复。")
if results.get("fixed_issues"):
lines.append("")
lines.append(f"🔧 本次测试期间已修复 {len(results['fixed_issues'])} 个问题。")
lines.append("")
lines.append("---")
lines.append("")
lines.append(f"*报告自动生成于 {results['test_date']}*")
return "\n".join(lines)
def main():
# 记录已修复的问题
results["fixed_issues"].append({
"title": "修复 /announcements 页面 HTTP 500 错误",
"file": "src/modules/announcements/components/announcement-list.tsx, src/app/(dashboard)/announcements/page.tsx",
"problem": "Server Component (/announcements/page.tsx) 直接传递函数 detailHrefBuilder 给 Client Component (AnnouncementList),违反 Next.js 16 的序列化规则,导致 'Functions cannot be passed directly to Client Components' 错误。",
"fix": "在 AnnouncementList 中新增 detailHrefPrefix prop字符串类型Server Component 安全),内部通过 prefix + id 拼接构建详情链接。/announcements 和 /admin/announcements 页面改用 detailHrefPrefix 代替 detailHrefBuilder。保留 detailHrefBuilder 以兼容现有 Client Component 间调用。"
})
results["fixed_issues"].append({
"title": "修复消息发送失败 - 数据库 schema 不同步",
"file": "数据库 messages 表",
"problem": "schema.ts 中定义了 senderDeletedAt 和 receiverDeletedAt 列(用于软删除),但数据库 messages 表缺失这两列,导致 createMessage INSERT 查询失败,返回 'Failed query: insert into messages' 错误。",
"fix": "通过 ALTER TABLE messages ADD COLUMN sender_deleted_at TIMESTAMP NULL 和 ALTER TABLE messages ADD COLUMN receiver_deleted_at TIMESTAMP NULL 手动添加缺失列,使数据库 schema 与代码 schema 保持同步。"
})
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
for role in ["admin", "teacher", "student", "parent"]:
# 每个角色使用独立的 context确保 session 隔离
context = browser.new_context(
viewport={"width": 1440, "height": 900},
locale="zh-CN",
ignore_https_errors=True,
)
page = context.new_page()
try:
if not login(page, role):
record(f"LOGIN-{role}", role, "登录", "failed", f"登录失败: {role}")
continue
run_tests_for_role(page, role)
finally:
try:
page.close()
context.close()
except Exception:
pass
browser.close()
# 生成报告
report = generate_report()
report_path = WEBTEST_DIR / f"仪表公告和消息_{VERSION}_.md"
with open(report_path, "w", encoding="utf-8") as f:
f.write(report)
print(f"\n📄 报告已写入: {report_path}")
# 同时输出 JSON
json_path = report_path.with_suffix(".json")
with open(json_path, "w", encoding="utf-8") as f:
json.dump(results, f, ensure_ascii=False, indent=2, default=str)
print(f"📄 JSON 数据已写入: {json_path}")
# 打印汇总
s = results["summary"]
print(f"\n{'='*60}")
print(f"测试完成: 总计 {s['total']}, 通过 {s['passed']}, 失败 {s['failed']}, 警告 {s['warnings']}")
print(f"{'='*60}")
if __name__ == "__main__":
main()