""" services/section_extractor.py 从 Markdown 中: 1) 抽取目录(章节标题层级)-> 用于生成模板章节(目录) 2) 按标题拆分正文 -> 每个章节的内容(用于入库 report_section_references) 抽取/过滤逻辑参考 eval_report/routers/template.py 与 routers/reference.py。 """ from __future__ import annotations import hashlib import re _MAX_SECTION_TITLE_LEN = 200 # ────────────────────────────── 通用过滤/清洗 ────────────────────────────── def _segment_looks_like_year(segment: str) -> bool: if not segment.isdigit() or len(segment) != 4: return False year = int(segment) return 1900 <= year <= 2099 def _is_valid_section_number(num: str) -> bool: """章节编号形如 1 / 1.1 / 2.3.4,排除正文年份(2017、2019 等)。""" parts = [p for p in str(num or "").strip().split(".") if p] if not parts or not all(p.isdigit() for p in parts): return False if any(_segment_looks_like_year(p) for p in parts): return False if len(parts) == 1: return 1 <= int(parts[0]) <= 20 return all(1 <= int(p) <= 99 for p in parts) def _heading_title_core(rest: str) -> str: return re.sub(r"^\d+(?:\.\d+)*\s*", "", str(rest or "").strip()).strip() def _rest_looks_like_body_text(rest: str) -> bool: """过滤日期句、长段落、数据说明句等被误识别为标题的正文。""" t = _heading_title_core(rest) or str(rest or "").strip() if not t: return True if re.match(r"^[月日]", t): return True if re.search(r"月\d", t): return True if re.match(r"^\d{4}\s*年", t) or re.match(r"^\d{4}[、,]", t): return True if re.search(r"\d{4}\s*[-~~—至]\s*\d{4}", t): return True if t.count("。") >= 2 or t.count(";") >= 2: return True if len(t) > 80 and re.search(r"[,。;:]", t): return True if len(t) > 45 and any( k in t for k in ( "运营数据", "预测数据", "实际运营", "根据公司", "发展规划", "工况下", "万吨", "有项目", "无项目", ) ): return True if len(t) > 45 and not re.search( r"(评价|分析|结论|概况|说明|措施|建议|对比|控制|实现|状况|情况|程序|模式|评价结论)$", t.rstrip("。;,"), ): return True return False def _looks_like_real_heading_title(title: str) -> bool: if not str(title or "").strip(): return False return not _rest_looks_like_body_text(title) def _clean_heading_title(s: str) -> str: t = str(s or "").strip() t = re.sub(r"\s+", " ", t) t = re.sub(r"\s+\d+$", "", t).strip() # 去掉目录行尾页码 m_note = re.search(r"[((]([^))]{20,})[))]", t) if m_note and re.search(r"[,。;:]", m_note.group(1)): t = re.sub(r"\s*[((][^))]{20,}[))]\s*$", "", t).strip() return t def _section_dict(section_key: str, section_title: str) -> dict: return {"sectionKey": section_key, "sectionTitle": section_title} def _canonical_to_section_key(canonical: str, order: int) -> str: return ( re.sub(r"[^a-z0-9\u4e00-\u9fa5]+", "-", canonical).strip("-") or f"section-{order}" ) def normalize_section_key(raw_key: str | None, title: str | None) -> str: """生成稳定且可入库的 section_key(<=64),超长追加短哈希。""" base = (raw_key or "").strip().lower() if not base: base = (title or "").strip().lower() base = re.sub(r"[^a-z0-9\u4e00-\u9fa5]+", "-", base).strip("-") if not base: base = "section" if len(base) <= 64: return base digest = hashlib.md5(base.encode("utf-8")).hexdigest()[:10] prefix = base[:53].rstrip("-") return f"{prefix}-{digest}" # ────────────────────────────── 目录(TOC)抽取 ────────────────────────────── def _walk_markdown_heading_sections(text: str) -> list[dict]: """ 单次遍历 Markdown,按标题(# ~ ######)切分章节并捕获正文(不含本节标题行)。 标题层级自动编号(## 项目概况 -> 1.1 项目概况),无显式编号也可处理。 被判定为"非真实标题"的 # 行视为正文内容,不另起章节。 正文范围: - 默认(SECTION_CONTENT_INCLUDE_SUBSECTIONS=True):聚合整棵子树, 即本节标题之后、直到下一个"层级 <= 本节"的标题之前的全部内容 (含下级小节标题与正文),保证父章节正文非空。 - 关闭时:仅取到下一个任意标题之前(本节自身正文)。 返回每节:{number, title, full_title, canonical, section_key(canonical), level, content} 目录抽取与正文拆分共用此函数,确保目录与内容一一对应。 """ from config import settings include_sub = bool(getattr(settings, "SECTION_CONTENT_INCLUDE_SUBSECTIONS", True)) lines = str(text or "").splitlines() counters: list[int] = [] accepted: list[dict] = [] seen: set[str] = set() for idx, raw in enumerate(lines): m = re.match(r"^(#{1,6})\s+(.+)$", str(raw or "").strip()) if not m: continue level = len(m.group(1)) title = _clean_heading_title(m.group(2).strip()) is_valid = ( bool(title) and len(title) <= _MAX_SECTION_TITLE_LEN and _looks_like_real_heading_title(title) ) if not is_valid: continue if len(counters) < level: counters.extend([0] * (level - len(counters))) else: counters = counters[:level] counters[level - 1] += 1 for i in range(level, len(counters)): counters[i] = 0 num = ".".join(str(counters[i]) for i in range(level)) full_title = f"{num} {title}" canonical = f"{num}|{title}".lower() if canonical in seen: continue seen.add(canonical) accepted.append( { "number": num, "title": title, "full_title": full_title, "canonical": canonical, "section_key": _canonical_to_section_key(canonical, len(accepted) + 1), "level": level, "start_idx": idx, } ) total = len(lines) for i, sec in enumerate(accepted): body_start = sec["start_idx"] + 1 # 排除本节标题行 end = total for j in range(i + 1, len(accepted)): nxt = accepted[j] if include_sub: if nxt["level"] <= sec["level"]: end = nxt["start_idx"] break else: end = nxt["start_idx"] break sec["content"] = "\n".join(lines[body_start:end]).strip() sec.pop("start_idx", None) return accepted def _extract_sections_from_markdown_headings(text: str) -> list[dict]: """ 从 Markdown 标题(# / ## / ###)构建模板章节目录。 复刻 eval_report 报告模板管理模块 services/template_service.py 的同名逻辑: 标题层级自动编号(## 项目概况 -> 1.1 项目概况),并过滤非真实标题行。 """ lines = str(text or "").splitlines() counters: list[int] = [] out: list[dict] = [] seen: set[str] = set() for raw in lines: m = re.match(r"^(#{1,6})\s+(.+)$", str(raw or "").strip()) if not m: continue level = len(m.group(1)) title = _clean_heading_title(m.group(2).strip()) if not title or len(title) > _MAX_SECTION_TITLE_LEN: continue if not _looks_like_real_heading_title(title): continue if len(counters) < level: counters.extend([0] * (level - len(counters))) else: counters = counters[:level] counters[level - 1] += 1 for i in range(level, len(counters)): counters[i] = 0 num = ".".join(str(counters[i]) for i in range(level)) full_title = f"{num} {title}" canonical = f"{num}|{title}".lower() if canonical in seen: continue seen.add(canonical) out.append( _section_dict( _canonical_to_section_key(canonical, len(out) + 1), full_title, ) ) return out def extract_sections_from_text(text: str) -> list[dict]: """抽取模板章节目录(入库 report_template_sections)。 复刻 eval_report 报告模板管理模块的逻辑:优先按 Markdown 标题层级识别, 命中数 >= 8 时直接采用;否则回退到目录/编号行识别。""" md_sections = _extract_sections_from_markdown_headings(text) if len(md_sections) >= 8: return md_sections lines = str(text or "").splitlines() out: list[dict] = [] seen: set[str] = set() candidates: list[dict] = [] for raw in lines: line = str(raw or "").strip() if not line: continue line = re.sub(r"^#{1,6}\s*", "", line).strip() line = line.replace("\u3000", " ") line = re.sub(r"\s+", " ", line).strip() if re.match(r"^20\d{2}\s*年\s*\d{1,2}\s*月$", line): continue if line in {"目次", "目录"}: continue if re.match(r"^\d+\s*[)\)]\s*.+$", line): continue has_page_no = bool(re.search(r"\s+\d+\s*$", line)) m = re.match(r"^((?:\d+(?:\.\d+){0,5}))\s*([^\s].*)$", line) if m: num = m.group(1).strip() if not _is_valid_section_number(num): continue rest = _clean_heading_title(m.group(2).strip()) if not rest or rest.startswith(")") or rest.startswith(")"): continue if _rest_looks_like_body_text(rest): continue if len(rest) > _MAX_SECTION_TITLE_LEN: continue full_title = f"{num} {rest}"[:_MAX_SECTION_TITLE_LEN].rstrip() canonical = f"{num}|{rest}".lower() else: m2 = re.match(r"^([一二三四五六七八九十]+[、..])\s*([^\s].*)$", line) if not m2: continue rest2 = _clean_heading_title(m2.group(2).strip()) if not rest2 or _rest_looks_like_body_text(rest2) or len(rest2) > _MAX_SECTION_TITLE_LEN: continue full_title = f"{m2.group(1)} {rest2}"[:_MAX_SECTION_TITLE_LEN].rstrip() canonical = f"{m2.group(1)}|{rest2}".lower() candidates.append({"canonical": canonical, "title": full_title, "has_page_no": has_page_no}) use_toc_only = False toc_rows = [c for c in candidates if c["has_page_no"]] toc_nums = set() for c in toc_rows: m_num = re.match(r"^(\d+)", c["title"]) if m_num: toc_nums.add(m_num.group(1)) if len(toc_rows) >= 20 and {"1", "2", "3", "4", "5", "6", "7"}.issubset(toc_nums): use_toc_only = True picked = toc_rows if use_toc_only else candidates for c in picked: canonical = c["canonical"] if canonical in seen: continue if not _looks_like_real_heading_title(c["title"]): continue seen.add(canonical) out.append(_section_dict(_canonical_to_section_key(canonical, len(out) + 1), c["title"])) return out # ────────────────────────────── 正文按标题拆分 ────────────────────────────── def split_markdown_into_sections(text: str) -> list[dict[str, str]]: """ 按 Markdown 标题切分正文,与目录抽取共用同一套标题识别与自动编号, 保证每个目录章节都能拿到对应正文。section_key 为自动编号(如 1.1)。 若文档不含 Markdown 标题,则回退到"带编号标题"的拆分方式。 返回 [{section_key, section_title, content}, ...]。 """ walk = _walk_markdown_heading_sections(text) if walk: return [ { "section_key": s["number"], "section_title": s["full_title"], "content": s["content"], } for s in walk ] return split_markdown_by_headings(text) def split_markdown_by_headings(text: str) -> list[dict[str, str]]: """ 按 Markdown 标题(# ~ ####,且带章节编号,如 ## 1.1 标题)拆分正文。 返回 [{section_key, section_title, content}, ...],section_key 为编号(如 1.1)。 """ lines = str(text or "").splitlines() heading_pattern = re.compile(r"^#{1,4}\s+(\d+(?:\.\d+)*)\s+(.+)") sections: list[dict[str, str]] = [] current_key: str | None = None current_title: str | None = None current_lines: list[str] = [] for line in lines: m = heading_pattern.match(line) if m: if current_key and current_lines: sections.append({ "section_key": current_key, "section_title": current_title or "", "content": "\n".join(current_lines).strip(), }) current_key = m.group(1) current_title = m.group(2).strip() current_lines = [line] else: if current_key: current_lines.append(line) if current_key and current_lines: sections.append({ "section_key": current_key, "section_title": current_title or "", "content": "\n".join(current_lines).strip(), }) return sections def parse_section_order(section_key: str) -> int: """将 '1.2.1' 转为整数 121 用于排序。""" digits = str(section_key or "").replace(".", "") return int(digits) if digits.isdigit() else 0 def clamp_text_bytes(text: str, max_bytes: int, *, suffix: str = "\n…(内容过长,已截断)") -> str: """ 将文本按 UTF-8 字节数截断到 max_bytes 以内,且不会截断到半个字符。 用于适配 MySQL TEXT 列(最大 65535 字节)。 """ if not text or max_bytes <= 0: return text data = text.encode("utf-8") if len(data) <= max_bytes: return text suffix_bytes = len(suffix.encode("utf-8")) budget = max(max_bytes - suffix_bytes, 0) # errors="ignore" 会丢弃末尾被切断的不完整字符,保证是合法 UTF-8 truncated = data[:budget].decode("utf-8", errors="ignore").rstrip() return truncated + suffix