section_reference_block/services/section_extractor.py
xxy 43f3e0b746 Initial commit
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-05 18:41:06 +08:00

407 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
services/section_extractor.py
从 Markdown 中:
1) 抽取目录(章节标题层级)-> 用于生成模板章节(目录)
2) 按标题拆分正文 -> 每个章节的内容(用于入库 report_section_references
抽取/过滤逻辑参考 eval_report/routers/template.py 与 routers/reference.py。
"""
from __future__ import annotations
import hashlib
import re
_MAX_SECTION_TITLE_LEN = 200
# ────────────────────────────── 通用过滤/清洗 ──────────────────────────────
def _segment_looks_like_year(segment: str) -> bool:
if not segment.isdigit() or len(segment) != 4:
return False
year = int(segment)
return 1900 <= year <= 2099
def _is_valid_section_number(num: str) -> bool:
"""章节编号形如 1 / 1.1 / 2.3.4排除正文年份2017、2019 等)。"""
parts = [p for p in str(num or "").strip().split(".") if p]
if not parts or not all(p.isdigit() for p in parts):
return False
if any(_segment_looks_like_year(p) for p in parts):
return False
if len(parts) == 1:
return 1 <= int(parts[0]) <= 20
return all(1 <= int(p) <= 99 for p in parts)
def _heading_title_core(rest: str) -> str:
return re.sub(r"^\d+(?:\.\d+)*\s*", "", str(rest or "").strip()).strip()
def _rest_looks_like_body_text(rest: str) -> bool:
"""过滤日期句、长段落、数据说明句等被误识别为标题的正文。"""
t = _heading_title_core(rest) or str(rest or "").strip()
if not t:
return True
if re.match(r"^[月日]", t):
return True
if re.search(r"\d", t):
return True
if re.match(r"^\d{4}\s*年", t) or re.match(r"^\d{4}[、,]", t):
return True
if re.search(r"\d{4}\s*[-~—至]\s*\d{4}", t):
return True
if t.count("") >= 2 or t.count("") >= 2:
return True
if len(t) > 80 and re.search(r"[,。;:]", t):
return True
if len(t) > 45 and any(
k in t
for k in (
"运营数据", "预测数据", "实际运营", "根据公司",
"发展规划", "工况下", "万吨", "有项目", "无项目",
)
):
return True
if len(t) > 45 and not re.search(
r"(评价|分析|结论|概况|说明|措施|建议|对比|控制|实现|状况|情况|程序|模式|评价结论)$",
t.rstrip("。;,"),
):
return True
return False
def _looks_like_real_heading_title(title: str) -> bool:
if not str(title or "").strip():
return False
return not _rest_looks_like_body_text(title)
def _clean_heading_title(s: str) -> str:
t = str(s or "").strip()
t = re.sub(r"\s+", " ", t)
t = re.sub(r"\s+\d+$", "", t).strip() # 去掉目录行尾页码
m_note = re.search(r"[(]([^)]{20,})[)]", t)
if m_note and re.search(r"[,。;:]", m_note.group(1)):
t = re.sub(r"\s*[(][^)]{20,}[)]\s*$", "", t).strip()
return t
def _section_dict(section_key: str, section_title: str) -> dict:
return {"sectionKey": section_key, "sectionTitle": section_title}
def _canonical_to_section_key(canonical: str, order: int) -> str:
return (
re.sub(r"[^a-z0-9\u4e00-\u9fa5]+", "-", canonical).strip("-")
or f"section-{order}"
)
def normalize_section_key(raw_key: str | None, title: str | None) -> str:
"""生成稳定且可入库的 section_key<=64超长追加短哈希。"""
base = (raw_key or "").strip().lower()
if not base:
base = (title or "").strip().lower()
base = re.sub(r"[^a-z0-9\u4e00-\u9fa5]+", "-", base).strip("-")
if not base:
base = "section"
if len(base) <= 64:
return base
digest = hashlib.md5(base.encode("utf-8")).hexdigest()[:10]
prefix = base[:53].rstrip("-")
return f"{prefix}-{digest}"
# ────────────────────────────── 目录TOC抽取 ──────────────────────────────
def _walk_markdown_heading_sections(text: str) -> list[dict]:
"""
单次遍历 Markdown按标题# ~ ######)切分章节并捕获正文(不含本节标题行)。
标题层级自动编号(## 项目概况 -> 1.1 项目概况),无显式编号也可处理。
被判定为"非真实标题"的 # 行视为正文内容,不另起章节。
正文范围:
- 默认SECTION_CONTENT_INCLUDE_SUBSECTIONS=True聚合整棵子树
即本节标题之后、直到下一个"层级 <= 本节"的标题之前的全部内容
(含下级小节标题与正文),保证父章节正文非空。
- 关闭时:仅取到下一个任意标题之前(本节自身正文)。
返回每节:{number, title, full_title, canonical, section_key(canonical), level, content}
目录抽取与正文拆分共用此函数,确保目录与内容一一对应。
"""
from config import settings
include_sub = bool(getattr(settings, "SECTION_CONTENT_INCLUDE_SUBSECTIONS", True))
lines = str(text or "").splitlines()
counters: list[int] = []
accepted: list[dict] = []
seen: set[str] = set()
for idx, raw in enumerate(lines):
m = re.match(r"^(#{1,6})\s+(.+)$", str(raw or "").strip())
if not m:
continue
level = len(m.group(1))
title = _clean_heading_title(m.group(2).strip())
is_valid = (
bool(title)
and len(title) <= _MAX_SECTION_TITLE_LEN
and _looks_like_real_heading_title(title)
)
if not is_valid:
continue
if len(counters) < level:
counters.extend([0] * (level - len(counters)))
else:
counters = counters[:level]
counters[level - 1] += 1
for i in range(level, len(counters)):
counters[i] = 0
num = ".".join(str(counters[i]) for i in range(level))
full_title = f"{num} {title}"
canonical = f"{num}|{title}".lower()
if canonical in seen:
continue
seen.add(canonical)
accepted.append(
{
"number": num,
"title": title,
"full_title": full_title,
"canonical": canonical,
"section_key": _canonical_to_section_key(canonical, len(accepted) + 1),
"level": level,
"start_idx": idx,
}
)
total = len(lines)
for i, sec in enumerate(accepted):
body_start = sec["start_idx"] + 1 # 排除本节标题行
end = total
for j in range(i + 1, len(accepted)):
nxt = accepted[j]
if include_sub:
if nxt["level"] <= sec["level"]:
end = nxt["start_idx"]
break
else:
end = nxt["start_idx"]
break
sec["content"] = "\n".join(lines[body_start:end]).strip()
sec.pop("start_idx", None)
return accepted
def _extract_sections_from_markdown_headings(text: str) -> list[dict]:
"""
从 Markdown 标题(# / ## / ###)构建模板章节目录。
复刻 eval_report 报告模板管理模块 services/template_service.py 的同名逻辑:
标题层级自动编号(## 项目概况 -> 1.1 项目概况),并过滤非真实标题行。
"""
lines = str(text or "").splitlines()
counters: list[int] = []
out: list[dict] = []
seen: set[str] = set()
for raw in lines:
m = re.match(r"^(#{1,6})\s+(.+)$", str(raw or "").strip())
if not m:
continue
level = len(m.group(1))
title = _clean_heading_title(m.group(2).strip())
if not title or len(title) > _MAX_SECTION_TITLE_LEN:
continue
if not _looks_like_real_heading_title(title):
continue
if len(counters) < level:
counters.extend([0] * (level - len(counters)))
else:
counters = counters[:level]
counters[level - 1] += 1
for i in range(level, len(counters)):
counters[i] = 0
num = ".".join(str(counters[i]) for i in range(level))
full_title = f"{num} {title}"
canonical = f"{num}|{title}".lower()
if canonical in seen:
continue
seen.add(canonical)
out.append(
_section_dict(
_canonical_to_section_key(canonical, len(out) + 1),
full_title,
)
)
return out
def extract_sections_from_text(text: str) -> list[dict]:
"""抽取模板章节目录(入库 report_template_sections
复刻 eval_report 报告模板管理模块的逻辑:优先按 Markdown 标题层级识别,
命中数 >= 8 时直接采用;否则回退到目录/编号行识别。"""
md_sections = _extract_sections_from_markdown_headings(text)
if len(md_sections) >= 8:
return md_sections
lines = str(text or "").splitlines()
out: list[dict] = []
seen: set[str] = set()
candidates: list[dict] = []
for raw in lines:
line = str(raw or "").strip()
if not line:
continue
line = re.sub(r"^#{1,6}\s*", "", line).strip()
line = line.replace("\u3000", " ")
line = re.sub(r"\s+", " ", line).strip()
if re.match(r"^20\d{2}\s*年\s*\d{1,2}\s*月$", line):
continue
if line in {"目次", "目录"}:
continue
if re.match(r"^\d+\s*[\)]\s*.+$", line):
continue
has_page_no = bool(re.search(r"\s+\d+\s*$", line))
m = re.match(r"^((?:\d+(?:\.\d+){0,5}))\s*([^\s].*)$", line)
if m:
num = m.group(1).strip()
if not _is_valid_section_number(num):
continue
rest = _clean_heading_title(m.group(2).strip())
if not rest or rest.startswith("") or rest.startswith(")"):
continue
if _rest_looks_like_body_text(rest):
continue
if len(rest) > _MAX_SECTION_TITLE_LEN:
continue
full_title = f"{num} {rest}"[:_MAX_SECTION_TITLE_LEN].rstrip()
canonical = f"{num}|{rest}".lower()
else:
m2 = re.match(r"^([一二三四五六七八九十]+[、.])\s*([^\s].*)$", line)
if not m2:
continue
rest2 = _clean_heading_title(m2.group(2).strip())
if not rest2 or _rest_looks_like_body_text(rest2) or len(rest2) > _MAX_SECTION_TITLE_LEN:
continue
full_title = f"{m2.group(1)} {rest2}"[:_MAX_SECTION_TITLE_LEN].rstrip()
canonical = f"{m2.group(1)}|{rest2}".lower()
candidates.append({"canonical": canonical, "title": full_title, "has_page_no": has_page_no})
use_toc_only = False
toc_rows = [c for c in candidates if c["has_page_no"]]
toc_nums = set()
for c in toc_rows:
m_num = re.match(r"^(\d+)", c["title"])
if m_num:
toc_nums.add(m_num.group(1))
if len(toc_rows) >= 20 and {"1", "2", "3", "4", "5", "6", "7"}.issubset(toc_nums):
use_toc_only = True
picked = toc_rows if use_toc_only else candidates
for c in picked:
canonical = c["canonical"]
if canonical in seen:
continue
if not _looks_like_real_heading_title(c["title"]):
continue
seen.add(canonical)
out.append(_section_dict(_canonical_to_section_key(canonical, len(out) + 1), c["title"]))
return out
# ────────────────────────────── 正文按标题拆分 ──────────────────────────────
def split_markdown_into_sections(text: str) -> list[dict[str, str]]:
"""
按 Markdown 标题切分正文,与目录抽取共用同一套标题识别与自动编号,
保证每个目录章节都能拿到对应正文。section_key 为自动编号(如 1.1)。
若文档不含 Markdown 标题,则回退到"带编号标题"的拆分方式。
返回 [{section_key, section_title, content}, ...]。
"""
walk = _walk_markdown_heading_sections(text)
if walk:
return [
{
"section_key": s["number"],
"section_title": s["full_title"],
"content": s["content"],
}
for s in walk
]
return split_markdown_by_headings(text)
def split_markdown_by_headings(text: str) -> list[dict[str, str]]:
"""
按 Markdown 标题(# ~ ####,且带章节编号,如 ## 1.1 标题)拆分正文。
返回 [{section_key, section_title, content}, ...]section_key 为编号(如 1.1)。
"""
lines = str(text or "").splitlines()
heading_pattern = re.compile(r"^#{1,4}\s+(\d+(?:\.\d+)*)\s+(.+)")
sections: list[dict[str, str]] = []
current_key: str | None = None
current_title: str | None = None
current_lines: list[str] = []
for line in lines:
m = heading_pattern.match(line)
if m:
if current_key and current_lines:
sections.append({
"section_key": current_key,
"section_title": current_title or "",
"content": "\n".join(current_lines).strip(),
})
current_key = m.group(1)
current_title = m.group(2).strip()
current_lines = [line]
else:
if current_key:
current_lines.append(line)
if current_key and current_lines:
sections.append({
"section_key": current_key,
"section_title": current_title or "",
"content": "\n".join(current_lines).strip(),
})
return sections
def parse_section_order(section_key: str) -> int:
"""'1.2.1' 转为整数 121 用于排序。"""
digits = str(section_key or "").replace(".", "")
return int(digits) if digits.isdigit() else 0
def clamp_text_bytes(text: str, max_bytes: int, *, suffix: str = "\n…(内容过长,已截断)") -> str:
"""
将文本按 UTF-8 字节数截断到 max_bytes 以内,且不会截断到半个字符。
用于适配 MySQL TEXT 列(最大 65535 字节)。
"""
if not text or max_bytes <= 0:
return text
data = text.encode("utf-8")
if len(data) <= max_bytes:
return text
suffix_bytes = len(suffix.encode("utf-8"))
budget = max(max_bytes - suffix_bytes, 0)
# errors="ignore" 会丢弃末尾被切断的不完整字符,保证是合法 UTF-8
truncated = data[:budget].decode("utf-8", errors="ignore").rstrip()
return truncated + suffix