Python 后端提交

Python 后端(FastAPI + FastMCP + ...)的初始版本号设定为 0.1.0,这是 uv 在 pypriject.toml
里给我自动设置的,我觉得有道理。
This commit is contained in:
2026-04-21 13:38:46 +08:00
parent 14eadaab86
commit b284c3c260
27 changed files with 1691 additions and 0 deletions

0
njupt_api/__init__.py Normal file
View File

10
njupt_api/art.txt Normal file
View File

@@ -0,0 +1,10 @@
________ ___ ___ ___ ________ _________ ________ ___ ___ ________ ________ ________ ________ ___
|\ ___ \ |\ \|\ \|\ \|\ __ \|\___ ___\ |\ ____\|\ \|\ \|\ __ \|\ ___ \ |\ __ \|\ __ \|\ \
\ \ \\ \ \ \ \ \ \ \\\ \ \ \|\ \|___ \ \_| \ \ \___|\ \ \\\ \ \ \|\ \ \ \\ \ \ \ \ \|\ \ \ \|\ \ \ \
\ \ \\ \ \ __ \ \ \ \ \\\ \ \ ____\ \ \ \ \ \_____ \ \ \\\ \ \ __ \ \ \\ \ \ \ \ __ \ \ ____\ \ \
\ \ \\ \ \|\ \\_\ \ \ \\\ \ \ \___| \ \ \ \|____|\ \ \ \\\ \ \ \ \ \ \ \\ \ \ \ \ \ \ \ \ \___|\ \ \
\ \__\\ \__\ \________\ \_______\ \__\ \ \__\ ____\_\ \ \_______\ \__\ \__\ \__\\ \__\ \ \__\ \__\ \__\ \ \__\
\|__| \|__|\|________|\|_______|\|__| \|__| |\_________\|_______|\|__|\|__|\|__| \|__| \|__|\|__|\|__| \|__|
\|_________|
=> NJUPT Suan API (v.{}) | Made with 💗 by MangoFanFanw | Powered by FastAPI, FastMCP, Vue and more ~

View File

@@ -0,0 +1,14 @@
from .config import config
from .logger import LogRecord, log_buffer, log_record_serialize, logger
from .mcploggingmiddleware import LoggingMiddleware
from .playcontextmanager import PlayContextManager
__all__ = [
config,
LogRecord,
log_buffer,
log_record_serialize,
logger,
LoggingMiddleware,
PlayContextManager,
]

115
njupt_api/baselib/config.py Normal file
View File

@@ -0,0 +1,115 @@
from json import dumps, loads
from pathlib import Path
from typing import TypeVar
import aiofiles
from .logger import logger
CONFIG_PATH = Path.cwd() / "data" / "config.json"
T = TypeVar("T")
class Config:
def __init__(self) -> None:
self._doc = {}
async def load_json(self) -> None:
"""
从 Toml 配置文件中读取配置。
"""
logger.debug("异步读取配置文件。")
async with aiofiles.open(file=CONFIG_PATH, mode="r") as f:
self._doc = loads(await f.read())
def sync_load_json(self) -> None:
"""
同步读取配置文件,仅限于 main.py 中启动时。
Raises:
FileNotFoundError: 配置文件不存在。
"""
logger.debug("同步读取配置文件。")
try:
with open(file=CONFIG_PATH, mode="r") as f:
self._doc = loads(f.read())
except FileNotFoundError:
logger.warning("FileNotFoundError - 配置文件不存在。")
raise
def sync_create_json(self) -> None:
"""
同步创建配置文件。
"""
logger.debug("同步创建配置文件。")
with open(file=CONFIG_PATH, mode="w") as f:
f.write(dumps(self._doc))
def init_config(self) -> None:
"""
重新初始化 Toml 配置文件。这会重置所有配置。
"""
logger.warning("初始化配置文件,这会重置所有配置。")
self._doc.clear()
doc_system = {}
doc_schedule = {}
doc_log = {}
doc_system["host"] = "0.0.0.0"
doc_system["port"] = 8000
doc_system["reload"] = True
doc_schedule["jwxt_login_method"] = "sso"
doc_schedule["semester_start_date"] = "2026-03-02"
doc_schedule["schedule_title_template"] = "芒果酸的第 {title} 周课程表"
doc_schedule["schedule_subtitle_template"] = "我也要上吗?"
doc_log["log_api_request_details"] = False
doc_log["log_mcp_request_details"] = False
doc_log["log_assets_request"] = False
self._doc["system"] = doc_system
self._doc["schedule"] = doc_schedule
self._doc["log"] = doc_log
async def save_json(self) -> None:
"""
异步保存 Toml 配置文件。
"""
logger.debug("异步保存配置文件。")
async with aiofiles.open(file=CONFIG_PATH, mode="w") as f:
await f.write(dumps(self._doc, indent=4))
def get(self, group: str, option: str, default: T) -> T:
"""
获取配置项的值。
Args:
group: Table
option: Key
default: 默认值
Returns:
Any与 default 参数类型相同。
"""
try:
return self._doc.get(group).get(option)
except AttributeError:
return default
def to_dict(self) -> dict:
return self._doc
def from_dict(self, data: dict) -> None:
self._doc.clear()
for key, value in data.items():
if isinstance(value, dict):
t_table = {}
for k, v in value.items():
t_table[k] = v
self._doc[key] = t_table
config = Config()

View File

@@ -0,0 +1,42 @@
import sys
from collections import deque
from loguru import logger
logger.remove()
logger.add(
sys.stdout,
level="DEBUG",
colorize=True,
)
logger.add("data/app.log", rotation="10 MB", retention="7 days") # 文件日志
log_buffer = deque(maxlen=1000)
class LogRecord:
log_counter = 0
def __init__(self, message: str) -> None:
self.id = LogRecord.log_counter
self.message = message
LogRecord.log_counter += 1
def log_record_serialize(record: LogRecord) -> dict:
return {
"id": record.id,
"message": record.message,
}
def memory_sink(message: str) -> None:
"""向自定义缓冲区写入日志,供 WebUI 获取
:param message: 'loguru._handler.Message'
"""
log_entry = LogRecord(message=message)
log_buffer.append(log_entry)
logger.add(sink=memory_sink, level="DEBUG", colorize=True)

View File

@@ -0,0 +1,36 @@
import time
from fastmcp.server.middleware import CallNext, MiddlewareContext
from fastmcp.server.middleware.middleware import Middleware
from fastmcp.tools import ToolResult
from mcp import types as mt
from . import config
from .logger import logger
class LoggingMiddleware(Middleware):
async def on_call_tool(
self,
context: MiddlewareContext[mt.CallToolRequestParams],
call_next: CallNext[mt.CallToolRequestParams, ToolResult],
) -> ToolResult:
tool_name = context.message.name
args = context.message.arguments
start_time = time.time()
logger.debug(f"MCP → 调用工具: {tool_name}")
if config.get("log", "log_mcp_request_details", False):
logger.debug(f"调用参数 - {args=}")
try:
result = await call_next(context)
elapsed = time.time() - start_time
logger.info(f"MCP ← 工具 {tool_name} 完成, 耗时: {elapsed:.3f}s")
return result
except Exception as e:
elapsed = time.time() - start_time
logger.error(
f"MCP ✗ 工具 {tool_name} 失败, 耗时: {elapsed:.3f}s, 错误: {e}",
)
raise

View File

@@ -0,0 +1,56 @@
from playwright.async_api import (
Browser,
BrowserContext,
Page,
Playwright,
async_playwright,
)
class PlayContextManager:
def __init__(
self,
playwright: Playwright = None,
browser: Browser = None,
context: BrowserContext = None,
page: Page = None,
) -> None:
self.playwright = playwright
self.browser = browser
self.context = context
self.page = page
self.isLogin = False
async def start(self) -> None:
"""手动启动"""
self.playwright = await async_playwright().start() # 不是 __enter__
self.browser = await self.playwright.chromium.launch(
headless=False,
args=[
"--disable-blink-features=AutomationControlled",
"--no-sandbox",
"--disable-setuid-sandbox",
"--disable-dev-shm-usage",
"--disable-gpu",
"--no-proxy-server",
],
)
self.context = await self.browser.new_context()
self.page = await self.context.new_page()
async def __aenter__(self) -> "PlayContextManager":
await self.start()
return self
async def close(self) -> None:
"""手动关闭"""
if self.context:
await self.context.close()
if self.browser:
await self.browser.close()
if self.playwright:
await self.playwright.stop() # 不是 __exit__
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: # noqa: ANN001
await self.close()

View File

@@ -0,0 +1,13 @@
from .createcourse import create_course_schedule
from .sso import SSO
from .types import Course, course_dict_serializer, course_list_serializer
from .zhengfang import ZhengFang
__all__ = [
create_course_schedule,
SSO,
Course,
course_dict_serializer,
course_list_serializer,
ZhengFang,
]

View File

@@ -0,0 +1,315 @@
import re
from bs4 import BeautifulSoup
from .types import Course
def normalize_course_str(course_str: str) -> str:
"""
规范化课程字符串,确保 create_course 能正确解析。
Returns:
字符串。
"""
parts = course_str.split("<br>")
while parts and parts[0] == "":
parts.pop(0)
while len(parts) < 4:
parts.append(" ")
for i in range(2, 4):
if parts[i] == "":
parts[i] = " "
return "<br>".join(parts)
def create_course_schedule(html: str) -> list[Course]:
"""解析给定 HTML 字符串,返回包含数个 Course 对象的列表。
Args:
html: HTML 字符串。应该有且只有一个 <table> 标签,其中是课程表数据。
Returns:
list[Course]
"""
soup = BeautifulSoup(html, "html.parser")
table = soup.find("table")
rows = table.find_all("tr")
courses: list[Course] = []
rowspan_map: dict[int, int] = {}
# 解析第一行表头,建立列索引到星期几的映射
# 表头格式第1列是"时间"(colspan=2),然后是 星期一 到 星期日
day_map: dict[int, int] = {} # col_idx -> day (1-7)
if rows:
header_cells = rows[0].find_all(["td", "th"])
col_idx = 0
for cell in header_cells:
text = cell.get_text(strip=True)
colspan = int(cell.get("colspan", 1))
# 跳过"时间"单元格
if text != "时间":
# 映射星期几到数字
day_mapping = {
"星期一": 1,
"星期二": 2,
"星期三": 3,
"星期四": 4,
"星期五": 5,
"星期六": 6,
"星期日": 7,
"星期天": 7,
}
day = day_mapping.get(text)
if day is not None:
for c in range(col_idx, col_idx + colspan):
day_map[c] = day
col_idx += colspan
for row_idx, row in enumerate(rows):
if row_idx == 0:
continue
cells = row.find_all(["td", "th"])
col_idx = 0
class_start: int | None = None
for cell in cells:
while col_idx in rowspan_map and rowspan_map[col_idx] > 0:
rowspan_map[col_idx] -= 1
if rowspan_map[col_idx] == 0:
del rowspan_map[col_idx]
col_idx += 1
text = cell.get_text(strip=True)
colspan = int(cell.get("colspan", 1))
rowspan = int(cell.get("rowspan", 1))
if text.startswith("") and text.endswith(""):
class_start = int(text[1:-1])
if rowspan > 1:
for c in range(col_idx, col_idx + colspan):
rowspan_map[c] = rowspan - 1
col_idx += colspan
continue
if text in ("早晨", "上午", "下午", "晚上"):
if rowspan > 1:
for c in range(col_idx, col_idx + colspan):
rowspan_map[c] = rowspan - 1
col_idx += colspan
continue
td_str = str(cell)
start = td_str.find(">") + 1
end = td_str.rfind("</td>")
inner_html = td_str[start:end]
if "&nbsp;" not in inner_html and inner_html.strip():
inner_html = re.sub(r"<br\s*/?>", "<br>", inner_html)
course_strs = [
s.strip() for s in re.split(r"(?:<br>){2,}", inner_html) if s.strip() and "&nbsp;" not in s
]
# 获取当前列对应的星期几
day = day_map.get(col_idx, 1) # 默认为1星期一
for course_str in course_strs:
course_str = normalize_course_str(course_str)
courses.append(
create_course(
course_str,
day,
default_classes_start=class_start,
),
)
if rowspan > 1:
for c in range(col_idx, col_idx + colspan):
rowspan_map[c] = rowspan - 1
col_idx += colspan
return courses
def create_course(
raw: str,
day: int,
default_classes_start: int | None = None,
) -> Course:
"""根据从 HTML 中提取出的原字符串解析课程信息
Args:
raw: 原字符串,以 <br> 作为换行符
day: 周内的星期几
default_classes_start: 如果没有解析出课程的 classes则使用此参数。
此参数应当从表格的行标题解析。
Returns:
Course
"""
# 0 1 2 3 4
# ['概率论与数理统计', '1-17单(1,2)', '王雪红', '教3-520', '']
raw_list = raw.split("<br>")
# 首先去除列表头部的所有空字符串
while True:
if raw_list[0] == "":
raw_list.pop(0)
else:
break
# 对于大部分课程raw_list[1] 都是形如以下格式
# 1-17(3,4)
# 1-17单(1,2) *(也可能是双)
# 2节/周
# 2节/单周 *(也可能是双)
# 周三第3,4节{第1-17周}
# 周五第3,4节{第2-16周|双周}
raw_time = raw_list[1]
weeks = []
classes = []
single = False # 内部变量
double = False # 内部变量
# 处理前两种形式
if "-" in raw_time and "" not in raw_time:
# 也可能是 '1-17单'
t = raw_time.split("(") # ['1-17', '3-4)']
# 也可能是 '17单'
start, end = t[0].split("-") # ['1', '17']
if end.endswith(""):
end = end[:-1]
single = True
elif end.endswith(""):
end = end[:-1]
double = True
for i in range(int(start), int(end) + 1):
if single and i % 2 == 0:
continue
if double and i % 2 == 1:
continue
weeks.append(i)
raw_classes = t[1].removesuffix(")")
classes = [int(i) for i in raw_classes.split(",")]
# 处理中两种形式
elif "/" in raw_time:
# 默认学期 1-16 周
if "/单周" in raw_time:
single = True
elif "/双周" in raw_time:
double = True
for i in range(1, 17):
if single and i % 2 == 0:
continue
if double and i % 2 == 1:
continue
weeks.append(i)
# 获取多少节课
t_num = int(raw_time.split("")[0])
for i in range(0, t_num):
classes.append(default_classes_start + i)
# 处理后两种形式
elif "" in raw_time:
# '周三', '3,4节{', '1-17周}'
# '周五', '3,4节{', '2-16周|双周}'
u = raw_time.split("")
classes = [int(u_c) for u_c in u[1].split("")[0].split(",")]
# '1-17', '}'
# '2-16', '|双', '}'
u_w = u[2].split("")
if "" in u_w[1]:
single = True
elif "" in u_w[1]:
double = True
u_start, u_end = u_w[0].split("-")
for i in range(int(u_start), int(u_end) + 1):
if single and i % 2 == 0:
continue
if double and i % 2 == 1:
continue
weeks.append(i)
teacher = raw_list[2] if raw_list[2] != " " else None
classroom = raw_list[3] if raw_list[3] != " " else None
return Course(raw_list[0], weeks, day, classes, teacher, classroom)
def convert_dict_schedule_to_tuple(schedule: list[dict]) -> list[tuple]:
"""将字典格式的课表转换为压缩的元组格式。
Args:
schedule: list[dict],标准格式的课程数据
Returns:
list[tuple]: 压缩后的元组格式 (name, teacher, classroom, weeks_str, day, classes)
其中 weeks 尽量压缩为字符串格式(如 "1-17"
"""
result = []
for course in schedule:
name = course.get("name", "")
teacher = course.get("teacher")
classroom = course.get("classroom")
weeks = course.get("weeks", [])
day = course.get("day", 1)
classes = course.get("classes", [])
# 压缩 weeks 为字符串
weeks_str = compress_weeks_to_string(weeks) if weeks else ""
result.append((name, teacher, classroom, weeks_str, day, classes))
return result
def compress_weeks_to_string(weeks: list[int]) -> str:
"""将周数列表压缩为最短的字符串表示。
例如:
[1,2,3,4,5] -> "1-5"
[1,3,5,7] -> "1,3,5,7"
[1,2,3,5,6,7,8] -> "1-3,5-8"
[1] -> "1"
Args:
weeks: 周数列表
Returns:
str: 压缩后的周数字符串
"""
if not weeks:
return ""
# 去重并排序
weeks = sorted({int(w) for w in weeks})
ranges = []
start = end = weeks[0]
for w in weeks[1:]:
if w == end + 1:
# 连续,扩展当前范围
end = w
else:
# 不连续,保存当前范围,开始新范围
ranges.append((start, end))
start = end = w
# 保存最后一个范围
ranges.append((start, end))
# 格式化为字符串
parts = []
for start, end in ranges:
if start == end:
parts.append(str(start))
else:
parts.append(f"{start}-{end}")
return ",".join(parts)

View File

@@ -0,0 +1,38 @@
from njupt_api.baselib import PlayContextManager, logger
class SSO(PlayContextManager):
def __init__(self) -> None:
super().__init__()
async def login(self, username: str, password: str) -> bool:
"""使用用户名和密码实现登录南邮统一身份验证。
Parameters:
username: 用户名,学号,一般为一位大写字母+八位数字
password: 密码
Returns:
bool表明判登录是否成功。
"""
await self.page.goto("http://i.njupt.edu.cn/")
await self.page.fill('input[name="username"]', username)
await self.page.fill('input[type="password"]', password)
await self.page.click('button[type="button"]')
await self.page.wait_for_load_state("networkidle")
if "user-login" in self.page.url:
logger.error(f"{username} | 登录失败,请检查学号和密码是否正确。")
return False
logger.info(f"{username} | 登录南邮统一身份认证成功。")
self.isLogin = True
return True
async def goto_zf(self) -> None:
sub_frame = self.page.frame_locator('iframe[name="iframe0"]')
async with self.context.expect_event("page") as new_page_event:
await sub_frame.locator('a[title="教务系统"]').click()
self.page = await new_page_event.value
return

View File

@@ -0,0 +1,42 @@
from dataclasses import dataclass
@dataclass
class Course:
"""Course 是对课程表中的 **某一节课** 的抽象。
Examples:
1-17周星期一1-2节数据结构是一个 Course 对象;
1-17周星期三3-4节数据结构是另一个 Course 对象;
1-17周中的单周星期四3-4节英语是一个 Course 对象;
1-17周中的双周星期四3-4节物理是另一个 Course 对象。
"""
name: str
weeks: list[int]
day: int
classes: list[int]
teacher: str | None
classroom: str | None
def course_dict_serializer(course: Course) -> dict[str, str | list[int] | int | None]:
return {
"name": course.name,
"weeks": course.weeks,
"day": course.day,
"classes": course.classes,
"teacher": course.teacher,
"classroom": course.classroom,
}
def course_list_serializer(course_list: list[Course]) -> list[dict]:
final_list = []
for course in course_list:
final_list.append(course_dict_serializer(course))
return final_list

View File

@@ -0,0 +1,79 @@
from ddddocr import DdddOcr
from playwright.async_api import Browser, BrowserContext, Page, Playwright
from njupt_api.baselib import PlayContextManager, logger
from njupt_api.zhengfang import Course
from njupt_api.zhengfang.createcourse import create_course_schedule
from njupt_api.zhengfang.sso import SSO
class ZhengFang(PlayContextManager):
def __init__(
self,
playwright: Playwright = None,
browser: Browser = None,
context: BrowserContext = None,
page: Page = None,
) -> None:
super().__init__(playwright, browser, context, page)
@classmethod
async def init_from_sso(cls, sso: SSO) -> "ZhengFang":
await sso.goto_zf()
logger.info("从 SSO 进入正方教务系统。")
return cls(sso.playwright, sso.browser, sso.context, sso.page)
async def login(self, username: str, password: str) -> bool:
"""
使用用户名和密码实现教务系统登录。
Returns:
bool表明登录是否成功。
"""
await self.page.goto("http://jwxt.njupt.edu.cn")
# 填充用户名和密码
await self.page.fill("input#txtUserName", username)
await self.page.fill("input#TextBox2", password)
# 处理验证码
captcha_img = self.page.locator("img#icode")
captcha_bytes = await captcha_img.screenshot()
ocr = DdddOcr(show_ad=False)
captcha_code = str(ocr.classification(captcha_bytes))
logger.debug(f"识别到的验证码为: {captcha_code}")
await self.page.fill("input#txtSecretCode", captcha_code)
async with self.page.expect_event("dialog", timeout=3000) as dialog_info:
await self.page.click("input#Button1")
dialog = await dialog_info.value
if dialog.message == "请到信息维护中完善个人联系方式":
await dialog.accept()
logger.info(f"{username} | 登录正方教务系统成功。")
self.isLogin = True
return True
if "验证码" in dialog.message:
await dialog.accept()
logger.warning(f"{username} | 验证码错误,自动重试...")
return await self.login(username, password)
await dialog.accept()
logger.error(f"{username} | 登录失败,教务系统提示信息为: {dialog.message}")
return False
async def get_class_schedule(self) -> list[Course]:
await self.page.locator("a.top_link:has-text('公用信息')").click()
await self.page.locator("a:has-text('班级课表查询')").click()
sub_frame = self.page.frame_locator("iframe[name='zhuti']")
logger.debug("获取班级课表。")
return create_course_schedule(
f"<table>{await sub_frame.locator('table#Table6').inner_html()}</table>",
)
async def get_student_schedule(self) -> list[Course]:
await self.page.locator("a.top_link:has-text('信息查询')").click()
await self.page.locator("a:has-text('学生个人课表')").click()
sub_frame = self.page.frame_locator("iframe[name='zhuti']")
logger.debug("获取个人课表。")
return create_course_schedule(
f"<table>{await sub_frame.locator('table#Table1').inner_html()}</table>",
)