407 lines
15 KiB
Python
407 lines
15 KiB
Python
"""
|
||
services/section_extractor.py
|
||
从 Markdown 中:
|
||
1) 抽取目录(章节标题层级)-> 用于生成模板章节(目录)
|
||
2) 按标题拆分正文 -> 每个章节的内容(用于入库 report_section_references)
|
||
|
||
抽取/过滤逻辑参考 eval_report/routers/template.py 与 routers/reference.py。
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import hashlib
|
||
import re
|
||
|
||
_MAX_SECTION_TITLE_LEN = 200
|
||
|
||
|
||
# ────────────────────────────── 通用过滤/清洗 ──────────────────────────────
|
||
|
||
|
||
def _segment_looks_like_year(segment: str) -> bool:
|
||
if not segment.isdigit() or len(segment) != 4:
|
||
return False
|
||
year = int(segment)
|
||
return 1900 <= year <= 2099
|
||
|
||
|
||
def _is_valid_section_number(num: str) -> bool:
|
||
"""章节编号形如 1 / 1.1 / 2.3.4,排除正文年份(2017、2019 等)。"""
|
||
parts = [p for p in str(num or "").strip().split(".") if p]
|
||
if not parts or not all(p.isdigit() for p in parts):
|
||
return False
|
||
if any(_segment_looks_like_year(p) for p in parts):
|
||
return False
|
||
if len(parts) == 1:
|
||
return 1 <= int(parts[0]) <= 20
|
||
return all(1 <= int(p) <= 99 for p in parts)
|
||
|
||
|
||
def _heading_title_core(rest: str) -> str:
|
||
return re.sub(r"^\d+(?:\.\d+)*\s*", "", str(rest or "").strip()).strip()
|
||
|
||
|
||
def _rest_looks_like_body_text(rest: str) -> bool:
|
||
"""过滤日期句、长段落、数据说明句等被误识别为标题的正文。"""
|
||
t = _heading_title_core(rest) or str(rest or "").strip()
|
||
if not t:
|
||
return True
|
||
if re.match(r"^[月日]", t):
|
||
return True
|
||
if re.search(r"月\d", t):
|
||
return True
|
||
if re.match(r"^\d{4}\s*年", t) or re.match(r"^\d{4}[、,]", t):
|
||
return True
|
||
if re.search(r"\d{4}\s*[-~~—至]\s*\d{4}", t):
|
||
return True
|
||
if t.count("。") >= 2 or t.count(";") >= 2:
|
||
return True
|
||
if len(t) > 80 and re.search(r"[,。;:]", t):
|
||
return True
|
||
if len(t) > 45 and any(
|
||
k in t
|
||
for k in (
|
||
"运营数据", "预测数据", "实际运营", "根据公司",
|
||
"发展规划", "工况下", "万吨", "有项目", "无项目",
|
||
)
|
||
):
|
||
return True
|
||
if len(t) > 45 and not re.search(
|
||
r"(评价|分析|结论|概况|说明|措施|建议|对比|控制|实现|状况|情况|程序|模式|评价结论)$",
|
||
t.rstrip("。;,"),
|
||
):
|
||
return True
|
||
return False
|
||
|
||
|
||
def _looks_like_real_heading_title(title: str) -> bool:
|
||
if not str(title or "").strip():
|
||
return False
|
||
return not _rest_looks_like_body_text(title)
|
||
|
||
|
||
def _clean_heading_title(s: str) -> str:
|
||
t = str(s or "").strip()
|
||
t = re.sub(r"\s+", " ", t)
|
||
t = re.sub(r"\s+\d+$", "", t).strip() # 去掉目录行尾页码
|
||
m_note = re.search(r"[((]([^))]{20,})[))]", t)
|
||
if m_note and re.search(r"[,。;:]", m_note.group(1)):
|
||
t = re.sub(r"\s*[((][^))]{20,}[))]\s*$", "", t).strip()
|
||
return t
|
||
|
||
|
||
def _section_dict(section_key: str, section_title: str) -> dict:
|
||
return {"sectionKey": section_key, "sectionTitle": section_title}
|
||
|
||
|
||
def _canonical_to_section_key(canonical: str, order: int) -> str:
|
||
return (
|
||
re.sub(r"[^a-z0-9\u4e00-\u9fa5]+", "-", canonical).strip("-")
|
||
or f"section-{order}"
|
||
)
|
||
|
||
|
||
def normalize_section_key(raw_key: str | None, title: str | None) -> str:
|
||
"""生成稳定且可入库的 section_key(<=64),超长追加短哈希。"""
|
||
base = (raw_key or "").strip().lower()
|
||
if not base:
|
||
base = (title or "").strip().lower()
|
||
base = re.sub(r"[^a-z0-9\u4e00-\u9fa5]+", "-", base).strip("-")
|
||
if not base:
|
||
base = "section"
|
||
if len(base) <= 64:
|
||
return base
|
||
digest = hashlib.md5(base.encode("utf-8")).hexdigest()[:10]
|
||
prefix = base[:53].rstrip("-")
|
||
return f"{prefix}-{digest}"
|
||
|
||
|
||
# ────────────────────────────── 目录(TOC)抽取 ──────────────────────────────
|
||
|
||
|
||
def _walk_markdown_heading_sections(text: str) -> list[dict]:
|
||
"""
|
||
单次遍历 Markdown,按标题(# ~ ######)切分章节并捕获正文(不含本节标题行)。
|
||
标题层级自动编号(## 项目概况 -> 1.1 项目概况),无显式编号也可处理。
|
||
被判定为"非真实标题"的 # 行视为正文内容,不另起章节。
|
||
|
||
正文范围:
|
||
- 默认(SECTION_CONTENT_INCLUDE_SUBSECTIONS=True):聚合整棵子树,
|
||
即本节标题之后、直到下一个"层级 <= 本节"的标题之前的全部内容
|
||
(含下级小节标题与正文),保证父章节正文非空。
|
||
- 关闭时:仅取到下一个任意标题之前(本节自身正文)。
|
||
|
||
返回每节:{number, title, full_title, canonical, section_key(canonical), level, content}
|
||
目录抽取与正文拆分共用此函数,确保目录与内容一一对应。
|
||
"""
|
||
from config import settings
|
||
|
||
include_sub = bool(getattr(settings, "SECTION_CONTENT_INCLUDE_SUBSECTIONS", True))
|
||
|
||
lines = str(text or "").splitlines()
|
||
counters: list[int] = []
|
||
accepted: list[dict] = []
|
||
seen: set[str] = set()
|
||
|
||
for idx, raw in enumerate(lines):
|
||
m = re.match(r"^(#{1,6})\s+(.+)$", str(raw or "").strip())
|
||
if not m:
|
||
continue
|
||
level = len(m.group(1))
|
||
title = _clean_heading_title(m.group(2).strip())
|
||
is_valid = (
|
||
bool(title)
|
||
and len(title) <= _MAX_SECTION_TITLE_LEN
|
||
and _looks_like_real_heading_title(title)
|
||
)
|
||
if not is_valid:
|
||
continue
|
||
if len(counters) < level:
|
||
counters.extend([0] * (level - len(counters)))
|
||
else:
|
||
counters = counters[:level]
|
||
counters[level - 1] += 1
|
||
for i in range(level, len(counters)):
|
||
counters[i] = 0
|
||
num = ".".join(str(counters[i]) for i in range(level))
|
||
full_title = f"{num} {title}"
|
||
canonical = f"{num}|{title}".lower()
|
||
if canonical in seen:
|
||
continue
|
||
seen.add(canonical)
|
||
accepted.append(
|
||
{
|
||
"number": num,
|
||
"title": title,
|
||
"full_title": full_title,
|
||
"canonical": canonical,
|
||
"section_key": _canonical_to_section_key(canonical, len(accepted) + 1),
|
||
"level": level,
|
||
"start_idx": idx,
|
||
}
|
||
)
|
||
|
||
total = len(lines)
|
||
for i, sec in enumerate(accepted):
|
||
body_start = sec["start_idx"] + 1 # 排除本节标题行
|
||
end = total
|
||
for j in range(i + 1, len(accepted)):
|
||
nxt = accepted[j]
|
||
if include_sub:
|
||
if nxt["level"] <= sec["level"]:
|
||
end = nxt["start_idx"]
|
||
break
|
||
else:
|
||
end = nxt["start_idx"]
|
||
break
|
||
sec["content"] = "\n".join(lines[body_start:end]).strip()
|
||
sec.pop("start_idx", None)
|
||
|
||
return accepted
|
||
|
||
|
||
def _extract_sections_from_markdown_headings(text: str) -> list[dict]:
|
||
"""
|
||
从 Markdown 标题(# / ## / ###)构建模板章节目录。
|
||
复刻 eval_report 报告模板管理模块 services/template_service.py 的同名逻辑:
|
||
标题层级自动编号(## 项目概况 -> 1.1 项目概况),并过滤非真实标题行。
|
||
"""
|
||
lines = str(text or "").splitlines()
|
||
counters: list[int] = []
|
||
out: list[dict] = []
|
||
seen: set[str] = set()
|
||
|
||
for raw in lines:
|
||
m = re.match(r"^(#{1,6})\s+(.+)$", str(raw or "").strip())
|
||
if not m:
|
||
continue
|
||
level = len(m.group(1))
|
||
title = _clean_heading_title(m.group(2).strip())
|
||
if not title or len(title) > _MAX_SECTION_TITLE_LEN:
|
||
continue
|
||
if not _looks_like_real_heading_title(title):
|
||
continue
|
||
if len(counters) < level:
|
||
counters.extend([0] * (level - len(counters)))
|
||
else:
|
||
counters = counters[:level]
|
||
counters[level - 1] += 1
|
||
for i in range(level, len(counters)):
|
||
counters[i] = 0
|
||
num = ".".join(str(counters[i]) for i in range(level))
|
||
full_title = f"{num} {title}"
|
||
canonical = f"{num}|{title}".lower()
|
||
if canonical in seen:
|
||
continue
|
||
seen.add(canonical)
|
||
out.append(
|
||
_section_dict(
|
||
_canonical_to_section_key(canonical, len(out) + 1),
|
||
full_title,
|
||
)
|
||
)
|
||
return out
|
||
|
||
|
||
def extract_sections_from_text(text: str) -> list[dict]:
|
||
"""抽取模板章节目录(入库 report_template_sections)。
|
||
|
||
复刻 eval_report 报告模板管理模块的逻辑:优先按 Markdown 标题层级识别,
|
||
命中数 >= 8 时直接采用;否则回退到目录/编号行识别。"""
|
||
md_sections = _extract_sections_from_markdown_headings(text)
|
||
if len(md_sections) >= 8:
|
||
return md_sections
|
||
|
||
lines = str(text or "").splitlines()
|
||
out: list[dict] = []
|
||
seen: set[str] = set()
|
||
candidates: list[dict] = []
|
||
|
||
for raw in lines:
|
||
line = str(raw or "").strip()
|
||
if not line:
|
||
continue
|
||
line = re.sub(r"^#{1,6}\s*", "", line).strip()
|
||
line = line.replace("\u3000", " ")
|
||
line = re.sub(r"\s+", " ", line).strip()
|
||
|
||
if re.match(r"^20\d{2}\s*年\s*\d{1,2}\s*月$", line):
|
||
continue
|
||
if line in {"目次", "目录"}:
|
||
continue
|
||
if re.match(r"^\d+\s*[)\)]\s*.+$", line):
|
||
continue
|
||
|
||
has_page_no = bool(re.search(r"\s+\d+\s*$", line))
|
||
m = re.match(r"^((?:\d+(?:\.\d+){0,5}))\s*([^\s].*)$", line)
|
||
if m:
|
||
num = m.group(1).strip()
|
||
if not _is_valid_section_number(num):
|
||
continue
|
||
rest = _clean_heading_title(m.group(2).strip())
|
||
if not rest or rest.startswith(")") or rest.startswith(")"):
|
||
continue
|
||
if _rest_looks_like_body_text(rest):
|
||
continue
|
||
if len(rest) > _MAX_SECTION_TITLE_LEN:
|
||
continue
|
||
full_title = f"{num} {rest}"[:_MAX_SECTION_TITLE_LEN].rstrip()
|
||
canonical = f"{num}|{rest}".lower()
|
||
else:
|
||
m2 = re.match(r"^([一二三四五六七八九十]+[、..])\s*([^\s].*)$", line)
|
||
if not m2:
|
||
continue
|
||
rest2 = _clean_heading_title(m2.group(2).strip())
|
||
if not rest2 or _rest_looks_like_body_text(rest2) or len(rest2) > _MAX_SECTION_TITLE_LEN:
|
||
continue
|
||
full_title = f"{m2.group(1)} {rest2}"[:_MAX_SECTION_TITLE_LEN].rstrip()
|
||
canonical = f"{m2.group(1)}|{rest2}".lower()
|
||
candidates.append({"canonical": canonical, "title": full_title, "has_page_no": has_page_no})
|
||
|
||
use_toc_only = False
|
||
toc_rows = [c for c in candidates if c["has_page_no"]]
|
||
toc_nums = set()
|
||
for c in toc_rows:
|
||
m_num = re.match(r"^(\d+)", c["title"])
|
||
if m_num:
|
||
toc_nums.add(m_num.group(1))
|
||
if len(toc_rows) >= 20 and {"1", "2", "3", "4", "5", "6", "7"}.issubset(toc_nums):
|
||
use_toc_only = True
|
||
|
||
picked = toc_rows if use_toc_only else candidates
|
||
for c in picked:
|
||
canonical = c["canonical"]
|
||
if canonical in seen:
|
||
continue
|
||
if not _looks_like_real_heading_title(c["title"]):
|
||
continue
|
||
seen.add(canonical)
|
||
out.append(_section_dict(_canonical_to_section_key(canonical, len(out) + 1), c["title"]))
|
||
return out
|
||
|
||
|
||
# ────────────────────────────── 正文按标题拆分 ──────────────────────────────
|
||
|
||
|
||
def split_markdown_into_sections(text: str) -> list[dict[str, str]]:
|
||
"""
|
||
按 Markdown 标题切分正文,与目录抽取共用同一套标题识别与自动编号,
|
||
保证每个目录章节都能拿到对应正文。section_key 为自动编号(如 1.1)。
|
||
|
||
若文档不含 Markdown 标题,则回退到"带编号标题"的拆分方式。
|
||
返回 [{section_key, section_title, content}, ...]。
|
||
"""
|
||
walk = _walk_markdown_heading_sections(text)
|
||
if walk:
|
||
return [
|
||
{
|
||
"section_key": s["number"],
|
||
"section_title": s["full_title"],
|
||
"content": s["content"],
|
||
}
|
||
for s in walk
|
||
]
|
||
return split_markdown_by_headings(text)
|
||
|
||
|
||
def split_markdown_by_headings(text: str) -> list[dict[str, str]]:
|
||
"""
|
||
按 Markdown 标题(# ~ ####,且带章节编号,如 ## 1.1 标题)拆分正文。
|
||
返回 [{section_key, section_title, content}, ...],section_key 为编号(如 1.1)。
|
||
"""
|
||
lines = str(text or "").splitlines()
|
||
heading_pattern = re.compile(r"^#{1,4}\s+(\d+(?:\.\d+)*)\s+(.+)")
|
||
|
||
sections: list[dict[str, str]] = []
|
||
current_key: str | None = None
|
||
current_title: str | None = None
|
||
current_lines: list[str] = []
|
||
|
||
for line in lines:
|
||
m = heading_pattern.match(line)
|
||
if m:
|
||
if current_key and current_lines:
|
||
sections.append({
|
||
"section_key": current_key,
|
||
"section_title": current_title or "",
|
||
"content": "\n".join(current_lines).strip(),
|
||
})
|
||
current_key = m.group(1)
|
||
current_title = m.group(2).strip()
|
||
current_lines = [line]
|
||
else:
|
||
if current_key:
|
||
current_lines.append(line)
|
||
|
||
if current_key and current_lines:
|
||
sections.append({
|
||
"section_key": current_key,
|
||
"section_title": current_title or "",
|
||
"content": "\n".join(current_lines).strip(),
|
||
})
|
||
|
||
return sections
|
||
|
||
|
||
def parse_section_order(section_key: str) -> int:
|
||
"""将 '1.2.1' 转为整数 121 用于排序。"""
|
||
digits = str(section_key or "").replace(".", "")
|
||
return int(digits) if digits.isdigit() else 0
|
||
|
||
|
||
def clamp_text_bytes(text: str, max_bytes: int, *, suffix: str = "\n…(内容过长,已截断)") -> str:
|
||
"""
|
||
将文本按 UTF-8 字节数截断到 max_bytes 以内,且不会截断到半个字符。
|
||
用于适配 MySQL TEXT 列(最大 65535 字节)。
|
||
"""
|
||
if not text or max_bytes <= 0:
|
||
return text
|
||
data = text.encode("utf-8")
|
||
if len(data) <= max_bytes:
|
||
return text
|
||
suffix_bytes = len(suffix.encode("utf-8"))
|
||
budget = max(max_bytes - suffix_bytes, 0)
|
||
# errors="ignore" 会丢弃末尾被切断的不完整字符,保证是合法 UTF-8
|
||
truncated = data[:budget].decode("utf-8", errors="ignore").rstrip()
|
||
return truncated + suffix
|