report_generation/services/report_generation_service.py
xxy aa98ea2623 @
Initial commit

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@
2026-06-05 18:45:29 +08:00

7772 lines
292 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import json
import logging
import re
import threading
from difflib import SequenceMatcher
import unicodedata
import uuid
from datetime import datetime
from pathlib import Path
from types import SimpleNamespace
from typing import Any, Optional
from fastapi import HTTPException
logger = logging.getLogger(__name__)
# ── 运行时提示词落盘 ──────────────────────────────────────────────────────────
_PROMPT_DUMP_ROOT = Path(__file__).resolve().parent.parent / "comp" / "runtime"
_REPORT_OUTPUT_DUMP_ROOT = Path(__file__).resolve().parent.parent / "comp" / "report_outputs"
def _safe_markdown_filename(name: str, fallback: str = "section") -> str:
safe = re.sub(r'[\\/:*?"<>|]', "_", str(name or "").strip())
safe = re.sub(r"\s+", " ", safe).strip(" ._")
return safe[:120] or fallback
def _dump_runtime_prompt(
job_id: str,
section_key: str,
section_title: str,
system_prompt: str,
user_prompt: str,
) -> None:
"""将本次实际调用大模型的完整提示词(含证据)写入 comp/runtime/<job_id>/<section_key>.md。"""
try:
out_dir = _PROMPT_DUMP_ROOT / job_id
out_dir.mkdir(parents=True, exist_ok=True)
safe_key = re.sub(r'[\\/:*?"<>|]', "_", section_key)
out_path = out_dir / f"{safe_key}.md"
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
content = (
f"# {section_title}\n\n"
f"> job_id: `{job_id}` \n"
f"> section_key: `{section_key}` \n"
f"> 生成时间: {ts}\n\n"
"---\n\n"
"## System Prompt\n\n"
f"```\n{system_prompt}\n```\n\n"
"---\n\n"
"## User Prompt\n\n"
f"```\n{user_prompt}\n```\n"
)
out_path.write_text(content, encoding="utf-8")
except Exception as exc:
logger.warning("dump runtime prompt failed: %s", exc)
def _dump_report_chapter_json_markdown(
*,
job_id: str,
section_key: str,
section_title: str,
output_json: dict[str, Any],
) -> Optional[str]:
"""在章节流式生成结束并准备入库时,将该章节最终 JSON 输出写入 markdown 文件。
Returns:
写入的文件路径,文件已存在(合并写入)时返回 None。
"""
try:
out_dir = _REPORT_OUTPUT_DUMP_ROOT / job_id
out_dir.mkdir(parents=True, exist_ok=True)
safe_title = _safe_markdown_filename(section_title, fallback=_safe_markdown_filename(section_key))
out_path = out_dir / f"{safe_title}.md"
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
json_text = json.dumps(output_json or {}, ensure_ascii=False, indent=2, default=str)
content = (
f"# {section_title}\n\n"
f"> job_id: `{job_id}` \n"
f"> section_key: `{section_key}` \n"
f"> 写入时间: {ts}\n\n"
"```json\n"
f"{json_text}\n"
"```\n"
)
out_path.write_text(content, encoding="utf-8")
return str(out_path)
except Exception as exc:
logger.warning("dump report chapter json markdown failed: %s", exc)
return None
from sqlalchemy.orm import Session
from database import SessionLocal
from database.models import (
ElementCell,
ElementTable,
Project,
KbDocument,
ReportGenerationChapter,
ReportGenerationJob,
ReportTemplate,
ReportTemplateSection,
)
from schemas.write import (
GenerateReportJobItem,
GenerateReportChapterItem,
GenerateReportResult,
GenerateReportResultChapter,
)
from services.llm_client import chat_completions_json
from services.report_prompt_service import (
build_report_chapter_prompt,
build_repair_missing_tables_prompt,
build_table_format_repair_prompt,
chapter_generation_system_prompt,
repair_missing_tables_system_prompt,
table_format_repair_system_prompt,
)
from services.retrieval_service import RetrievalService
from config import settings
from services.appendix_figure_extraction import (
appendix_figure_markdown_images,
extract_appendix_figure_candidates_from_docx,
merge_best_appendix_figures,
)
from services.kb_service import _kb_doc_absolute_file_path_for_model
from services.report_runtime_store import (
append_chapter_content,
get_job_state,
init_job_state,
set_chapter_stream_phase,
update_chapter_state,
update_job_state,
)
from services.standard_elements_2020 import (
CHAPTER1_PROJECT_OVERVIEW_TABLE_GROUP,
MULTI_COLUMN_GLOBAL_SPECS,
APPENDIX2_CANONICAL_ROW_ORDER,
APPENDIX2_LEGACY_ROW_KEY_MAP,
APPENDIX8_LEGACY_ROW_KEY_MAP,
TABLE_5_3_ROW_KEY_ALTERNATES,
canonical_row_order_for_table,
TABLE_7_1_COLUMN_KEYS,
TABLE_7_1_ROW_CELL_DEFAULTS,
TABLE_7_1_SCORING_TABLE_NAME,
global_table_row_keys,
section_table_row_keys,
time_table_default_columns_for_name,
)
from prompts.report_generation.section_output_contracts import (
DEFAULT_SECTION_OUTPUT_CONTRACT,
SECTION_OUTPUT_CONTRACTS,
)
from prompts.report_generation.heading_rules import (
DEFAULT_HEADING_RULE,
SECTION_HEADING_RULES,
)
from prompts.report_generation.appendix_templates import (
APPENDIX8_PARAMETER_COMPARISON_TABLE,
APPENDIX_FIGURE_TARGETS,
MINIMAL_MISSING_TABLE_TEMPLATE,
missing_child_heading_markdown,
)
RUNNING_CHAPTER_STALE_SECONDS = 180
# 同一表号存在多张历史/别名表时,优先命中该表号的标准表名关键词,避免误选。
_TABLE_TOKEN_PREFERRED_NAME_HINTS: dict[str, tuple[str, ...]] = {
"表2-5": ("总图、储运、公用工程及辅助工程对比",),
"表2-6": ("储运、公用工程及辅助工程依托对比", "依托"),
"表3-3": ("施工图设计变更情况", "全厂性项目"),
"表3-4": ("施工图设计变更情况", "单装置项目"),
"表3-5": ("影响投资或工期", "重大设计变更"),
"表5-4": ("生产经营及效益情况对比表",),
"表5-5": ("主要生产经营指标",),
"表5-6": ("不同因素变化对项目内部收益率的影响",),
"表5-7": ("内部收益率为基准收益率时不确定因素临界点或临界值",),
}
# 表5-4 列键形如「可研报告|××年#1」须与附表时间槽区分且不可走「可研报告」前缀拆行否则会生成「可研报告-|××年#1」错位表头。
_TABLE54_PIPE_METRIC_PREFIXES = frozenset(
{"可研报告", "可研值", "实际值", "增减(%", "增减", "指标"}
)
# 与表5-1 等混同步入的非细则列,直出时剔除
_TABLE54_DROP_COL_KEYS = frozenset({"后评价值", "后评价报告"})
_TABLE54_INVISIBLE_RE = re.compile(r"[\ufeff\u200b-\u200d]")
def _table54_ck_norm(ck: str) -> str:
"""列键 NFKC 与去空白、BOM便于识别误写入的「unit」全角变体等。"""
t = unicodedata.normalize("NFKC", str(ck or "")).strip()
return _TABLE54_INVISIBLE_RE.sub("", t)
def _is_table54_operating_benefit(table_name: str) -> bool:
tn = str(table_name or "").strip()
return "表5-4" in tn and "生产经营及效益情况对比表" in tn
def _element_table_collect_score(db: Session, table: ElementTable, token: str) -> int:
"""报告生成选表表5-4 须优先时间表且列键为「可研报告|××年#1」结构避免误选抽取简表。"""
if not _table_token_matches_name(token, "表5-4"):
return 0
score = 0
if str(table.table_type or "").strip() == "time":
score += 200
name = str(table.table_name or "")
if "生产经营及效益" in name:
score += 40
sample = (
db.query(ElementCell.row_key, ElementCell.col_key)
.filter(
ElementCell.table_id == table.id,
ElementCell.value.isnot(None),
ElementCell.value != "",
)
.limit(48)
.all()
)
for rk, ck in sample:
rk_s, ck_s = str(rk or ""), str(ck or "")
if "|" in ck_s and any(
p in ck_s for p in ("可研报告", "实际值", "增减")
):
score += 8
if "·" in rk_s:
score += 2
if "年份未识别" in rk_s or "年份未识别" in ck_s:
score -= 40
return score
def _pick_table54_year_markdown(
year_items: list[tuple[str, str]],
*,
table_year: int | None = None,
) -> tuple[str, str] | None:
"""多张按年拆分的表5-4 取评价年(优先 element_tables.year / 2019且表体最完整的一张。"""
if not year_items:
return None
if len(year_items) == 1:
return year_items[0]
def _item_score(item: tuple[str, str]) -> int:
disp, md = item
sc = 0
if table_year is not None and str(table_year) in str(disp):
sc += 120
if "2019" in str(disp) or re.search(r"2019\s*年", md[:800]):
sc += 80
if "可研报告" in md and "实际值" in md:
sc += 70
if "增减" in md:
sc += 25
if "运行情况·" in md or "主要经济指标·" in md:
sc += 35
if "主要经济指标-" in md and "可研报告" not in md:
sc -= 60
sc += min(md.count("\n|"), 60)
return sc
return max(year_items, key=_item_score)
def _score_structured_table_hit_dict(hit: dict) -> int:
"""structuredTables 条目评分完整表5-4 对比表优先于 LLM 三行简表。"""
if not isinstance(hit, dict):
return 0
md = str(hit.get("markdown") or "")
if not md:
return 0
if _is_table54_simplified_extract_body(md):
return 0
sc = 0
if "可研报告" in md and "实际值" in md:
sc += 90
if "增减" in md:
sc += 25
if "运行情况·" in md or "主要经济指标·" in md:
sc += 40
if "主要经济指标-" in md and "可研报告" not in md:
sc -= 70
sc += min(md.count("\n|"), 80)
return sc
def _table54_body_preceded_by_element_source(text_before: str, *, max_chars: int = 600) -> bool:
"""表体紧邻前是否已有要素直出注释有则视为权威表5-4勿删勿换"""
tail = str(text_before or "")[-max_chars:]
if "表格来源:要素管理" not in tail:
return False
after = tail.rsplit("表格来源:要素管理", 1)[-1]
chunk = after.split("\n", 8)[-1]
return not any(
ln.strip().startswith("|") or _is_pipe_markdown_table_row_line(ln)
for ln in chunk.splitlines()[:6]
if ln.strip()
)
def _is_table54_simplified_extract_body(block: str) -> bool:
"""
识别抽取/LLM 三行简表仅「2019年实际值」等单列 + 少量「主要经济指标·」行,
无「可研报告|…」与「增减」对比结构。
"""
md = str(block or "").strip()
if not md or "|" not in md:
return False
hdr = re.sub(r"\s+", "", _extract_table_header_key(md)).lower()
hdr = re.sub(r"<br>.*", "", hdr, flags=re.IGNORECASE)
if not hdr:
return False
if "后评价值" in hdr or ("可研值" in hdr and "项目" not in hdr and "运行情况" not in md):
return True
has_compare_cols = ("可研报告" in hdr or ("可研" in hdr and "增减" in hdr)) and (
"实际值" in hdr or "实际" in hdr
)
if has_compare_cols and ("运行情况·" in md or md.count("\n|") >= 12):
return False
single_actual_year = bool(
re.search(r"\d{4}\s*年\s*实际值", hdr) or re.search(r"\d{4}年实际值", hdr)
)
if single_actual_year and "可研" not in hdr and "增减" not in hdr:
if "主要经济指标" in md or "主要经济指标-" in md:
return True
if "主要经济指标-" in md and "可研报告" not in md and "增减" not in md:
return True
pipe_rows = [
ln
for ln in md.splitlines()
if _is_pipe_markdown_table_row_line(ln) and not _is_pipe_markdown_table_separator_line(ln)
]
if (
len(pipe_rows) <= 5
and "主要经济指标" in md
and "可研报告" not in md
and "运行情况·" not in md
):
return True
return False
def _reorder_table54_col_order(col_order: list[str]) -> list[str]:
"""单位列置前;其余按年度槽与「可研→实际→增减」顺序排列;剔除无效英文 unit 列与表5-1 混入列。"""
cols: list[str] = []
for c in col_order:
s = _table54_ck_norm(c)
if not s:
continue
if s.lower() in ("unit", "__unit__"):
continue
if s in _TABLE54_DROP_COL_KEYS:
continue
cols.append(s)
cols = ["单位" if c == "指标单位" else c for c in cols]
seen: set[str] = set()
deduped: list[str] = []
for c in cols:
if c in seen:
continue
seen.add(c)
deduped.append(c)
cols = deduped
units = [c for c in cols if c == "单位"]
metrics = [c for c in cols if c != "单位"]
def _metric_rank(g: str) -> int:
gs = g.strip()
if gs in ("可研报告", "指标", "可研值"):
return 0
if gs == "实际值":
return 1
if gs.startswith("增减"):
return 2
return 9
def _sort_key(ck: str) -> tuple[str, int, str]:
if "|" not in ck:
return ("\xff", 99, ck)
g, t = ck.split("|", 1)
return (t.strip(), _metric_rank(g), ck)
metrics = sorted(metrics, key=_sort_key)
if not units:
return ["单位"] + metrics
return units + metrics
def _table54_rekey_latest_col_keys(latest: dict[tuple[str, str], str]) -> None:
"""将 latest 的 col_key 与 _reorder_table54_col_order 一致地做 NFKC 等规范化,否则「增减(%)」与「增减(%)」无法对齐。"""
tmp: dict[tuple[str, str], str] = {}
def _prefer_val(cur: str, new: str) -> str:
s_new = str(new or "").strip()
if s_new and s_new != "待补充":
return str(new)
s_cur = str(cur or "").strip()
if s_cur and s_cur != "待补充":
return str(cur)
return s_new or s_cur or ""
for (rk, ck), v in list(latest.items()):
rk_s = str(rk)
nk = _table54_ck_norm(str(ck))
key = (rk_s, nk)
if key in tmp:
tmp[key] = _prefer_val(tmp[key], v)
else:
tmp[key] = str(v or "")
latest.clear()
latest.update(tmp)
def _table54_coalesce_legacy_bare_metric_cols(
latest: dict[tuple[str, str], str], row_order: list[str]
) -> None:
"""
要素管理常见:数据写在裸列「可研报告/实际值/增减(%)」,
列定义仍为「可研报告|××年#1」等合并到槽位列以便与 UI 一致。
"""
slot_map = {
"可研报告": "可研报告|××年#1",
"实际值": "实际值|××年#1",
"增减(%": "增减(%|××年#1",
"增减(%)": "增减(%|××年#1",
}
def _prefer(a: str, b: str) -> str:
sa, sb = str(a or "").strip(), str(b or "").strip()
if sa and sa != "待补充":
return sa
if sb and sb != "待补充":
return sb
return sa or sb
for rk in row_order:
for bare, slot in slot_map.items():
merged = _prefer(latest.get((rk, slot), ""), latest.get((rk, bare), ""))
if merged:
latest[(rk, slot)] = merged
def _table54_merge_year_cells_for_table_year(
year_cells: dict[int | None, list],
*,
table_year: int | None,
) -> tuple[dict[int | None, list], list[int]]:
"""
表5-4同一张 element_tablesyear=2019下多数格子 element_cells.year 为空,
须与 year=2019 的少量格子合并后再渲染,否则只剩单列「实际值」简表。
"""
if table_year is None or int(table_year) <= 0:
real = sorted(y for y in year_cells if y is not None)
return year_cells, real
ty = int(table_year)
merged: list = list(year_cells.get(None, []))
for cy in sorted(y for y in year_cells if y is not None):
if cy == ty:
merged.extend(year_cells.get(cy, []))
if not merged:
return year_cells, sorted(y for y in year_cells if y is not None)
return {ty: merged}, [ty]
def _table54_remap_indicator_unit_latest(latest: dict[tuple[str, str], str]) -> None:
"""将历史列键「指标单位」的值并入「单位」,避免列键规范为「单位」后取不到数。"""
touched: list[tuple[str, str]] = []
for (rk, ck), v in list(latest.items()):
if str(ck) != "指标单位":
continue
rk_s = str(rk)
k_unit = (rk_s, "单位")
cur = str(latest.get(k_unit, "") or "").strip()
nv = str(v or "").strip()
if nv and (not cur or cur == "待补充"):
latest[k_unit] = v
elif not cur:
latest[k_unit] = v
touched.append((rk_s, str(ck)))
for pair in touched:
latest.pop(pair, None)
_TABLE54_SLOT_YEAR_RE = re.compile(r"^(\d{4})年(?:#\d+)?$")
def _norm_table54_placeholder_year_tail(tail: str) -> str:
buf: list[str] = []
for ch in (tail or "").strip():
if ch in "xX":
buf.append("×")
elif ch == "\u00d7":
buf.append("×")
else:
buf.append(ch)
return "".join(buf)
def _table54_placeholder_year_tail(tail: str) -> bool:
"""列键尾部为「××年#n」等占位列与前端 isEmTable54YearSlotColKey 一致)。"""
u = _norm_table54_placeholder_year_tail(tail)
return bool(re.fullmatch(r"×{2}年(?:#\d+)?", u))
def _parse_real_year_from_table54_slot_tail(tail: str) -> int | None:
"""列键尾部为「2019」「2019年」「2019年#1」等真实日历时返回四位年。"""
t = (tail or "").strip()
if _table54_placeholder_year_tail(t):
return None
m = _TABLE54_SLOT_YEAR_RE.fullmatch(t)
if m:
y = int(m.group(1))
if 1900 <= y <= 2100:
return y
m2 = re.match(r"^(\d{4})年", t)
if m2:
y = int(m2.group(1))
if 1900 <= y <= 2100:
return y
return None
def _infer_time_column_year_for_table54(
col_order: list[str],
cells: list[Any],
table_year: int | None,
) -> int | None:
"""
从单元格 year、时间表 element_tables.year、或列键「…|2019年」推断表5-4 年度栏对应的日历年。
无法唯一确定时返回 None表头占位列退回「某年」
"""
ys = sorted(
{
int(c.year)
for c in cells
if getattr(c, "year", None) is not None and int(c.year) > 0
}
)
if len(ys) == 1:
return ys[0]
if table_year is not None and int(table_year) > 0:
return int(table_year)
parsed: list[int] = []
for ck in col_order:
s = str(ck or "").strip()
if "|" not in s:
continue
_, tail = s.split("|", 1)
cy = _parse_real_year_from_table54_slot_tail(tail.strip())
if cy is not None:
parsed.append(cy)
uniq = sorted(set(parsed))
if len(uniq) == 1:
return uniq[0]
# 列键正文含四位年如「2019年可研报告」类裸列名多列多年份时不武断取第一个
text_years: list[int] = []
for ck in col_order:
m = re.search(r"(19|20)\d{2}", str(ck or ""))
if m:
yi = int(m.group(0))
if 1900 <= yi <= 2100:
text_years.append(yi)
ty_uniq = sorted(set(text_years))
if len(ty_uniq) == 1:
return ty_uniq[0]
return None
def _table54_year_label_prefix(time_column_year: int | None) -> str:
if time_column_year is not None and 1900 <= int(time_column_year) <= 2100:
return f"{int(time_column_year)}"
return "某年"
def _table54_year_prefix_for_slot_tail(tail: str, *, time_column_year: int | None) -> str:
"""表头「{年}可研报告」中的「{年}」:优先列键自带年份,否则用推断的日历年,最后退回「某年」。"""
cy = _parse_real_year_from_table54_slot_tail(tail)
if cy is not None:
return f"{cy}"
if _table54_placeholder_year_tail(tail):
return _table54_year_label_prefix(time_column_year)
t = (tail or "").strip()
if not t:
return "某年"
if re.match(r"^\d{4}", t):
return t.split("#", 1)[0]
return f"{t}" if not t.endswith("") else t
def _table54_bare_metric_header_label(col_key: str, *, time_column_year: int | None) -> str | None:
"""无「指标|年度槽」时的列键:表头带评价年。"""
s = str(col_key or "").strip()
if not s:
return None
ypfx = _table54_year_label_prefix(time_column_year)
if s == "实际值":
return f"{ypfx}实际值"
if s in ("可研值", "可研报告"):
return f"{ypfx}可研报告"
if s.startswith("增减"):
rest = s[len("增减") :]
return f"{ypfx}增减{rest}"
return None
def _table54_markdown_header_labels(
col_order: list[str],
*,
time_column_year: int | None = None,
) -> list[str]:
"""扁平表头:单位、{年}可研报告、{年}实际值、{年}增减;{年}来自列键或要素日历年推断。"""
out: list[str] = []
for ck in col_order:
s = str(ck).strip()
if s == "单位":
out.append("单位")
continue
if "|" not in s:
bare_l = _table54_bare_metric_header_label(s, time_column_year=time_column_year)
if bare_l is not None:
out.append(bare_l)
continue
out.append(s.replace("|", ""))
continue
g, t = s.split("|", 1)
g, t = g.strip(), t.strip()
g_norm = _table54_ck_norm(g)
if (g not in _TABLE54_PIPE_METRIC_PREFIXES and g_norm not in _TABLE54_PIPE_METRIC_PREFIXES) or not t:
out.append(s.replace("|", ""))
continue
ypfx = _table54_year_prefix_for_slot_tail(t, time_column_year=time_column_year)
if g_norm in ("可研报告", "指标", "可研值") or g in ("可研报告", "指标", "可研值"):
out.append(f"{ypfx}可研报告")
elif g_norm == "实际值" or g == "实际值":
out.append(f"{ypfx}实际值")
elif g_norm.startswith("增减") or g.startswith("增减"):
rest = g[len("增减") :]
out.append(f"{ypfx}增减{rest}")
else:
out.append(s.replace("|", ""))
return out
def create_report_job(
project_id: str,
db: Session,
*,
template_id: Optional[str] = None,
top_k: int = 10,
requested_by: Optional[str] = None,
) -> GenerateReportJobItem:
project = _resolve_project(db, project_id)
if not project:
raise HTTPException(status_code=404, detail="项目不存在")
template = _resolve_template(db, template_id)
sections = _sections_for_generation(_list_template_sections(db, template.id))
if not sections:
raise HTTPException(status_code=400, detail="模板未配置章节")
now = datetime.now()
job = ReportGenerationJob(
id=uuid.uuid4().hex,
project_id=project.uuid,
template_id=template.id,
status="pending",
progress=0,
requested_by=requested_by,
options={"topK": max(5, min(int(top_k or 10), 20))},
created_at=now,
updated_at=now,
)
db.add(job)
# 先把父任务写入当前事务,确保后续章节插入满足外键约束。
db.flush()
for s in sections:
db.add(
ReportGenerationChapter(
id=uuid.uuid4().hex,
job_id=job.id,
section_key=s.section_key,
section_title=s.section_title,
section_order=s.section_order,
status="pending",
created_at=now,
updated_at=now,
)
)
db.commit()
init_job_state(
job_id=job.id,
project_id=project.uuid,
template_id=template.id,
chapters=[
{
"sectionKey": s.section_key,
"sectionTitle": s.section_title,
"sectionOrder": s.section_order,
"status": "pending",
}
for s in sections
],
)
_start_job_worker(job.id)
return get_report_job(project.uuid, job.id, db)
def get_report_job(project_id: str, job_id: str, db: Session) -> GenerateReportJobItem:
project = _resolve_project(db, project_id)
if not project:
raise HTTPException(status_code=404, detail="项目不存在")
job = (
db.query(ReportGenerationJob)
.filter(ReportGenerationJob.id == job_id, ReportGenerationJob.project_id == project.uuid)
.first()
)
if not job:
raise HTTPException(status_code=404, detail="任务不存在")
_recover_stalled_job(db, job)
chapters = (
db.query(ReportGenerationChapter)
.filter(ReportGenerationChapter.job_id == job.id)
.order_by(ReportGenerationChapter.section_order.asc())
.all()
)
runtime_state = get_job_state(job.id)
runtime_chapter_map = ((runtime_state or {}).get("chapters") or {}) if isinstance(runtime_state, dict) else {}
return GenerateReportJobItem(
jobId=job.id,
projectId=job.project_id,
templateId=job.template_id,
status=(runtime_state or {}).get("status") or job.status,
progress=int((runtime_state or {}).get("progress") or job.progress or 0),
currentSectionKey=(runtime_state or {}).get("currentSectionKey") or job.current_section_key,
errorMessage=(runtime_state or {}).get("errorMessage") or job.error_message,
createdAt=_fmt_dt(job.created_at),
updatedAt=(runtime_state or {}).get("updatedAt") or _fmt_dt(job.updated_at),
completedAt=(runtime_state or {}).get("completedAt") or _fmt_dt(job.completed_at),
chapters=[
GenerateReportChapterItem(
sectionKey=c.section_key,
sectionTitle=c.section_title,
sectionOrder=c.section_order,
status=(runtime_chapter_map.get(c.section_key) or {}).get("status") or c.status,
updatedAt=(runtime_chapter_map.get(c.section_key) or {}).get("updatedAt") or _fmt_dt(c.updated_at),
errorMessage=(runtime_chapter_map.get(c.section_key) or {}).get("errorMessage") or c.error_message,
)
for c in chapters
],
)
def get_report_result(
project_id: str,
job_id: str,
db: Session,
*,
include_debug: bool = False,
) -> GenerateReportResult:
project = _resolve_project(db, project_id)
if not project:
raise HTTPException(status_code=404, detail="项目不存在")
job = (
db.query(ReportGenerationJob)
.filter(ReportGenerationJob.id == job_id, ReportGenerationJob.project_id == project.uuid)
.first()
)
if not job:
raise HTTPException(status_code=404, detail="任务不存在")
_recover_stalled_job(db, job)
chapter_rows = (
db.query(ReportGenerationChapter)
.filter(ReportGenerationChapter.job_id == job.id)
.order_by(ReportGenerationChapter.section_order.asc())
.all()
)
chapter_title_map: dict[str, str] = {}
if job.template_id:
chapter_title_map = _build_section_title_map(
_list_template_sections(db, job.template_id)
)
runtime_state = get_job_state(job.id)
if runtime_state:
report_text, chapter_items = _build_live_result_from_runtime(
runtime_state,
include_debug=include_debug,
chapter_title_map=chapter_title_map,
)
else:
report_text, chapter_items = _build_live_result_from_chapters(
chapter_rows,
include_debug=include_debug,
chapter_title_map=chapter_title_map,
)
report_text = _append_report_appendices(db, project.uuid, report_text)
report_text = _normalize_table_captions_in_markdown(report_text or "")
consistency = _check_consistency(report_text or "", project.name)
return GenerateReportResult(
jobId=job.id,
status=(runtime_state or {}).get("status") or job.status,
report=report_text,
consistency=consistency,
chapters=chapter_items,
)
def _build_live_result_from_chapters(
chapters: list[ReportGenerationChapter],
*,
include_debug: bool = False,
chapter_title_map: Optional[dict[str, str]] = None,
) -> tuple[str, list[GenerateReportResultChapter]]:
chapter_items: list[GenerateReportResultChapter] = []
report_parts: list[str] = []
title_map = chapter_title_map or {}
for i, c in enumerate(chapters):
normalized_content = _fix_numeric_line_breaks(str(c.content or "").strip()) if c.content else c.content
if normalized_content:
normalized_content, _ = _collapse_consecutive_text_repetitions(
str(normalized_content).strip()
)
normalized_content = _normalize_table_captions_in_markdown(str(normalized_content).strip())
prev_body = ""
if i > 0 and chapters[i - 1].content:
prev_body = _fix_numeric_line_breaks(str(chapters[i - 1].content).strip())
normalized_content = _inject_missing_parent_section_headings(
str(c.section_title or ""),
str(normalized_content).strip(),
prev_body,
title_map,
)
chapter_items.append(
GenerateReportResultChapter(
sectionKey=c.section_key,
sectionTitle=c.section_title,
sectionOrder=c.section_order,
status=c.status,
content=normalized_content,
errorMessage=c.error_message,
promptText=(c.prompt_text if include_debug else None),
evidencePayload=(c.evidence_payload if include_debug else None),
validationPayload=(c.validation_payload if include_debug else None),
)
)
if normalized_content:
report_parts.append(str(normalized_content).strip())
return _fix_numeric_line_breaks("\n\n".join(report_parts).strip()), chapter_items
def _build_live_result_from_runtime(
runtime_state: dict,
*,
include_debug: bool = False,
chapter_title_map: Optional[dict[str, str]] = None,
) -> tuple[str, list[GenerateReportResultChapter]]:
chapter_items: list[GenerateReportResultChapter] = []
report_parts: list[str] = []
title_map = chapter_title_map or {}
chapter_values = list(((runtime_state or {}).get("chapters") or {}).values())
chapter_values.sort(key=lambda x: int((x or {}).get("sectionOrder") or 0))
for i, chapter in enumerate(chapter_values):
if not isinstance(chapter, dict):
continue
normalized_content = _fix_numeric_line_breaks(str(chapter.get("content") or "").strip())
if normalized_content:
normalized_content, _ = _collapse_consecutive_text_repetitions(
str(normalized_content).strip()
)
normalized_content = _normalize_table_captions_in_markdown(str(normalized_content).strip())
prev_body = ""
if i > 0 and isinstance(chapter_values[i - 1], dict):
prev_body = _fix_numeric_line_breaks(str(chapter_values[i - 1].get("content") or "").strip())
normalized_content = _inject_missing_parent_section_headings(
str(chapter.get("sectionTitle") or ""),
normalized_content,
prev_body,
title_map,
)
chapter_items.append(
GenerateReportResultChapter(
sectionKey=str(chapter.get("sectionKey") or ""),
sectionTitle=str(chapter.get("sectionTitle") or ""),
sectionOrder=int(chapter.get("sectionOrder") or 0),
status=str(chapter.get("status") or "pending"),
content=normalized_content or None,
errorMessage=chapter.get("errorMessage"),
promptText=(chapter.get("promptText") if include_debug else None),
evidencePayload=(chapter.get("evidencePayload") if include_debug else None),
validationPayload=(chapter.get("validationPayload") if include_debug else None),
)
)
if normalized_content:
report_parts.append(normalized_content)
return _fix_numeric_line_breaks("\n\n".join(report_parts).strip()), chapter_items
def get_report_stream_snapshot(
job_id: str,
*,
include_debug: bool = False,
) -> Optional[dict[str, Any]]:
runtime_state = get_job_state(job_id)
if not runtime_state:
return None
chapter_title_map: dict[str, str] = {}
template_id = runtime_state.get("templateId")
if template_id:
with SessionLocal() as db:
chapter_title_map = _build_section_title_map(
_list_template_sections(db, str(template_id))
)
report_text, chapter_items = _build_live_result_from_runtime(
runtime_state,
include_debug=include_debug,
chapter_title_map=chapter_title_map,
)
runtime_chapters = list(((runtime_state or {}).get("chapters") or {}).values())
runtime_chapters.sort(key=lambda x: int((x or {}).get("sectionOrder") or 0))
job_payload = {
"jobId": runtime_state.get("jobId"),
"projectId": runtime_state.get("projectId"),
"templateId": runtime_state.get("templateId"),
"status": runtime_state.get("status"),
"progress": int(runtime_state.get("progress") or 0),
"currentSectionKey": runtime_state.get("currentSectionKey"),
"errorMessage": runtime_state.get("errorMessage"),
"createdAt": runtime_state.get("createdAt"),
"updatedAt": runtime_state.get("updatedAt"),
"completedAt": runtime_state.get("completedAt"),
"chapters": [
{
"sectionKey": str(c.get("sectionKey") or ""),
"sectionTitle": str(c.get("sectionTitle") or ""),
"sectionOrder": int(c.get("sectionOrder") or 0),
"status": str(c.get("status") or "pending"),
"updatedAt": c.get("updatedAt"),
"errorMessage": c.get("errorMessage"),
}
for c in runtime_chapters
],
}
result_payload = {
"jobId": runtime_state.get("jobId"),
"status": runtime_state.get("status"),
"report": report_text,
"consistency": [],
"chapters": [c.model_dump() for c in chapter_items],
}
return {
"job": job_payload,
"result": result_payload,
}
def retry_report_chapter(project_id: str, job_id: str, section_key: str, db: Session) -> GenerateReportJobItem:
project = _resolve_project(db, project_id)
if not project:
raise HTTPException(status_code=404, detail="项目不存在")
job = (
db.query(ReportGenerationJob)
.filter(ReportGenerationJob.id == job_id, ReportGenerationJob.project_id == project.uuid)
.first()
)
if not job:
raise HTTPException(status_code=404, detail="任务不存在")
chapter = (
db.query(ReportGenerationChapter)
.filter(ReportGenerationChapter.job_id == job.id, ReportGenerationChapter.section_key == section_key)
.first()
)
if not chapter:
raise HTTPException(status_code=404, detail="章节不存在")
now = datetime.now()
chapter.status = "pending"
chapter.error_message = None
chapter.updated_at = now
job.status = "running"
job.updated_at = now
db.commit()
update_job_state(job.id, status="running", errorMessage=None, completedAt=None)
update_chapter_state(
job.id,
section_key,
status="pending",
content=None,
errorMessage=None,
promptText=None,
evidencePayload=None,
validationPayload=None,
)
_start_job_worker(job.id, section_key=section_key)
return get_report_job(project.uuid, job_id, db)
def cancel_report_job(project_id: str, job_id: str, db: Session) -> GenerateReportJobItem:
project = _resolve_project(db, project_id)
if not project:
raise HTTPException(status_code=404, detail="项目不存在")
job = (
db.query(ReportGenerationJob)
.filter(ReportGenerationJob.id == job_id, ReportGenerationJob.project_id == project.uuid)
.first()
)
if not job:
raise HTTPException(status_code=404, detail="任务不存在")
now = datetime.now()
if job.status in ("completed", "failed", "cancelled"):
return get_report_job(project.uuid, job_id, db)
chapters = (
db.query(ReportGenerationChapter)
.filter(ReportGenerationChapter.job_id == job.id)
.all()
)
for c in chapters:
if c.status in ("pending", "running"):
c.status = "failed"
c.error_message = "任务已由用户取消"
c.updated_at = now
job.status = "cancelled"
job.error_message = "任务已由用户取消"
job.current_section_key = None
job.updated_at = now
job.completed_at = now
db.commit()
update_job_state(
job.id,
status="cancelled",
errorMessage="任务已由用户取消",
currentSectionKey=None,
completedAt=_fmt_dt(now),
)
for c in chapters:
if c.status in ("failed", "cancelled") or c.error_message == "任务已由用户取消":
update_chapter_state(
job.id,
c.section_key,
status="failed",
errorMessage="任务已由用户取消",
)
return get_report_job(project.uuid, job_id, db)
def _start_job_worker(job_id: str, section_key: Optional[str] = None) -> None:
threading.Thread(
target=_run_job_worker,
args=(job_id, section_key),
daemon=True,
name=f"report-job-{job_id[:8]}",
).start()
def _run_job_worker(job_id: str, only_section_key: Optional[str] = None) -> None:
with SessionLocal() as db:
job = db.query(ReportGenerationJob).filter(ReportGenerationJob.id == job_id).first()
if not job:
return
try:
job.status = "running"
job.error_message = None
job.updated_at = datetime.now()
db.commit()
update_job_state(job.id, status="running", errorMessage=None)
project = db.query(Project).filter(Project.uuid == job.project_id).first()
if not project:
raise RuntimeError("项目不存在")
template = _resolve_template(db, job.template_id)
all_template_sections = _list_template_sections(db, template.id)
sections = _sections_for_generation(all_template_sections)
chapter_title_map = _build_section_title_map(all_template_sections)
chapters = (
db.query(ReportGenerationChapter)
.filter(ReportGenerationChapter.job_id == job.id)
.order_by(ReportGenerationChapter.section_order.asc())
.all()
)
chapter_map = {c.section_key: c for c in chapters}
completed_section_contents: dict[str, str] = {
c.section_key: str(c.content or "").strip()
for c in chapters
if c.status == "completed" and str(c.content or "").strip()
}
target_sections = [s for s in sections if (not only_section_key or s.section_key == only_section_key)]
retrieval = RetrievalService()
top_k = int((job.options or {}).get("topK") or 10)
completed_count = 0
pending_sections = []
for section in target_sections:
chapter = chapter_map.get(section.section_key)
if not chapter:
continue
if not only_section_key and chapter.status == "completed":
completed_count += 1
continue
pending_sections.append(section)
total_count = max(1, completed_count + len(pending_sections))
logger.info(
"报告生成 job start | job=%s | project=%s | total_sections=%d | pending=%d | completed=%d | top_k=%d",
job.id, project.uuid, len(sections), len(pending_sections), completed_count, top_k,
)
for idx, section in enumerate(pending_sections, start=1):
db.refresh(job)
if job.status == "cancelled":
return
chapter = chapter_map.get(section.section_key)
if not chapter:
continue
section_no = _extract_section_number(section.section_title or "")
logger.info(
"报告生成 start chapter | job=%s | section=%s | title=%s | section_no=%s | idx=%d/%d",
job.id, section.section_key, section.section_title, section_no, idx, total_count,
)
_update_chapter_status(db, job, chapter, "running", None)
update_job_state(job.id, currentSectionKey=section.section_key)
update_chapter_state(
job.id,
section.section_key,
status="running",
errorMessage=None,
content=None,
promptText=None,
evidencePayload=None,
validationPayload={"streamPhase": "waiting"},
)
required_tables = _extract_required_table_tokens(
section.section_prompt or "",
_extract_section_number(section.section_title or ""),
contract_text=_effective_section_output_contract(section),
)
if _extract_section_number(section.section_title or "") == "5.3.2":
na8 = _norm_table_token("附表8")
required_tables = [
t for t in required_tables if _norm_table_token(str(t)) != na8
]
evidence, retrieval_stage = _collect_evidence_progressive(
db,
retrieval,
project.uuid,
section,
top_k=top_k,
required_tables=required_tables,
)
prior_sibling_sections_text = _build_prior_sibling_sections_text(
section,
sections,
completed_section_contents,
)
section_reference = _load_section_reference_for_chapter(
db,
section.section_key,
section.section_title,
template_id=template.id,
)
logger.info(
"section_reference 注入 | section=%s | template_id=%s | 命中=%s",
section.section_key,
template.id,
"" if section_reference else "",
)
prompt = _build_chapter_prompt(
section,
evidence,
prior_sibling_sections_text=prior_sibling_sections_text,
section_reference=section_reference,
)
_dump_runtime_prompt(
job_id=job.id,
section_key=section.section_key,
section_title=section.section_title,
system_prompt=chapter_generation_system_prompt(),
user_prompt=prompt,
)
stream_state = {
"buffer": "",
"phase": "waiting",
}
def _on_content_delta(event: str, delta_text: str) -> None:
if event == "delta":
if delta_text:
stream_state["phase"] = "streaming"
stream_state["buffer"] = str(stream_state.get("buffer") or "") + delta_text
append_chapter_content(
job.id,
section.section_key,
delta_text,
stream_phase="streaming",
)
elif event == "finalizing":
stream_state["phase"] = "finalizing"
set_chapter_stream_phase(job.id, section.section_key, "finalizing")
content, validation, model_output = _generate_chapter_content(
section,
prompt,
on_content_delta=_on_content_delta,
)
content = _apply_canonical_field_backfill(section, evidence, content)
_cur_section_no = _extract_section_number(section.section_title or "")
_skip_table_enforcement = _cur_section_no in {"2.1.1"}
if _skip_table_enforcement:
remaining_missing_tables = []
content_after_tables = content
else:
content, remaining_missing_tables = _enforce_required_tables(
section,
prompt,
content,
evidence,
)
content_after_tables = content
content = _strip_tables_from_non_table_section(
section.section_title or "", content, section=section
)
content = _strip_forbidden_tables(
section.section_title or "", content,
)
content, format_issues = _enforce_template_format_contract(
section,
content,
evidence,
chapter_title_map=chapter_title_map,
)
_sec_no = _extract_section_number(section.section_title or "")
_refresh_tokens: tuple[str, ...] = ("表5-4",)
if _sec_no == "5.1":
_refresh_tokens = ("表5-1",)
elif _sec_no == "5.2.1":
_refresh_tokens = ("表5-2", "表5-3")
content = _refresh_element_table_markdown_tokens(
content, evidence, _refresh_tokens
)
content = _strip_bracketed_three_part_labels(content)
content = _strip_placeholder_table_notes(content)
content = _normalize_table_captions_in_markdown(content)
content = _strip_trailing_partial_missing_markers(content)
content = _fix_numeric_line_breaks(content)
content = _cleanup_section_table_artifacts(
section.section_title or "",
content,
allowed_table_tokens=required_tables,
)
if _sec_no == "5.3.1":
content = _refresh_element_table_markdown_tokens(
content, evidence, ("表5-4",)
)
content = _fill_required_table_caption_stubs(
content, ["表5-4"], evidence
)
content = _strip_orphan_markdown_table_rows(content)
content = _strip_minimal_missing_table_tail(content)
content, intra_repeat_removed = _collapse_consecutive_text_repetitions(content)
content, chapter_dedupe_removed = _dedupe_long_chapter_repetition(content)
chapter_dedupe_removed += intra_repeat_removed
if chapter_dedupe_removed > 0:
warnings = validation.get("warnings") if isinstance(validation, dict) else []
if not isinstance(warnings, list):
warnings = []
warnings.append(
f"章节去重:已移除 {chapter_dedupe_removed} 处重复段落/表格"
)
validation["warnings"] = warnings
validation["chapterDedupeRemoved"] = chapter_dedupe_removed
if required_tables and not _skip_table_enforcement:
content = _restore_required_tables_safety_net(
content,
required_tables,
evidence,
content_after_tables,
)
content = _finalize_section_table_dedupe(content, required_tables)
if remaining_missing_tables:
warnings = validation.get("warnings") if isinstance(validation, dict) else []
if not isinstance(warnings, list):
warnings = []
warnings.append(
"部分必需表格仍缺失,已插入占位表:"
+ "".join(remaining_missing_tables)
)
validation["warnings"] = warnings
if format_issues:
warnings = validation.get("warnings") if isinstance(validation, dict) else []
if not isinstance(warnings, list):
warnings = []
warnings.extend([f"格式验收器:{x}" for x in format_issues][:8])
validation["warnings"] = warnings
validation["retrievalStage"] = retrieval_stage
validation["streamPhase"] = "completed"
diagnostics = _build_field_diagnostics(section, evidence, content)
if diagnostics:
validation["fieldDiagnostics"] = diagnostics
if model_output:
validation["modelOutput"] = model_output
content = _inject_missing_parent_section_headings(
section.section_title or "",
content,
_previous_completed_section_content(
section, sections, completed_section_contents
),
chapter_title_map,
)
now = datetime.now()
chapter.content = content
completed_section_contents[section.section_key] = content
chapter.prompt_text = prompt[:20000]
chapter.evidence_payload = evidence
chapter.validation_payload = validation
chapter.status = "completed"
chapter.error_message = None
chapter.updated_at = now
chapter.completed_at = now
if not only_section_key:
job.progress = int((completed_count + idx) * 100 / total_count)
job.current_section_key = section.section_key
job.updated_at = now
db.commit()
dump_out_path = _dump_report_chapter_json_markdown(
job_id=job.id,
section_key=section.section_key,
section_title=section.section_title,
output_json={
"modelOutput": model_output or {},
"persistedChapter": {
"sectionKey": section.section_key,
"sectionTitle": section.section_title,
"sectionOrder": section.section_order,
"status": "completed",
"content": content,
"promptText": prompt[:20000],
"evidencePayload": evidence,
"validationPayload": validation,
},
},
)
logger.info(
"章节生成落盘 | job=%s | section=%s | prompt_len=%s | content_len=%s | output_file=%s",
job.id, section.section_key, len(prompt[:20000]), len(content),
dump_out_path or "(已存在合并写入)",
)
update_chapter_state(
job.id,
section.section_key,
status="completed",
content=content,
errorMessage=None,
promptText=prompt[:20000],
evidencePayload=evidence,
validationPayload=validation,
)
if not only_section_key:
update_job_state(
job.id,
progress=int((completed_count + idx) * 100 / total_count),
currentSectionKey=section.section_key,
)
else:
update_job_state(job.id, currentSectionKey=section.section_key)
db.refresh(job)
if job.status == "cancelled":
return
db.refresh(job)
if job.status == "cancelled":
return
if only_section_key:
# 单章重跑不应直接终结整任务,仅回写章节并刷新任务进度。
all_chapters = (
db.query(ReportGenerationChapter)
.filter(ReportGenerationChapter.job_id == job.id)
.order_by(ReportGenerationChapter.section_order.asc())
.all()
)
done = sum(1 for c in all_chapters if c.status == "completed")
total = max(1, len(all_chapters))
job.progress = int(done * 100 / total)
job.current_section_key = None
job.updated_at = datetime.now()
db.commit()
update_job_state(job.id, progress=int(done * 100 / total), currentSectionKey=None)
else:
job.snapshot = None
job.status = "completed"
job.progress = 100
job.current_section_key = None
job.completed_at = datetime.now()
job.updated_at = datetime.now()
db.commit()
logger.info(
"报告生成 job completed | job=%s | project=%s | total_chapters=%d",
job.id, project.uuid, len(sections),
)
update_job_state(
job.id,
status="completed",
progress=100,
currentSectionKey=None,
completedAt=_fmt_dt(job.completed_at),
)
except Exception as e:
current_section_key = job.current_section_key
logger.error(
"报告生成 job failed | job=%s | project=%s | section=%s | err=%s",
job.id, project.uuid, current_section_key, e,
)
job.status = "failed"
job.error_message = str(e)
job.updated_at = datetime.now()
db.commit()
update_job_state(job.id, status="failed", errorMessage=str(e))
if current_section_key:
update_chapter_state(
job.id,
current_section_key,
status="failed",
errorMessage=str(e),
)
# 这些章节号在 L1/L2 已判「证据充足」时仍继续走 L2/L3避免仅章节定向检索就提前结束而漏掉关键词召回。
_SECTION_NUMBERS_FORCE_L3_KEYWORD_RETRIEVAL: frozenset[str] = frozenset({"2.1.5", "3.3.3", "3.4.1", "3.6", "3.7", "3.8", "3.10", "4.3.3","5.2.3", "6.1.1.1", "6.1.1.2", "6.2.1", "6.2.4"})
def _section_forces_l3_keyword_retrieval(section: ReportTemplateSection) -> bool:
return _extract_section_number(section.section_title or "") in _SECTION_NUMBERS_FORCE_L3_KEYWORD_RETRIEVAL
def _collect_evidence_progressive(
db: Session,
retrieval: RetrievalService,
project_uuid: str,
section: ReportTemplateSection,
*,
top_k: int,
required_tables: Optional[list[str]] = None,
) -> tuple[dict, str]:
force_l3 = _section_forces_l3_keyword_retrieval(section)
# L1: 仅要素与结构化表
evidence = _collect_evidence(
db,
retrieval,
project_uuid,
section,
top_k=top_k,
required_tables=required_tables,
include_chapter_docs=False,
include_keyword_docs=False,
)
if _is_evidence_sufficient(section, evidence) and not force_l3:
return evidence, "elements_only"
# L2: 补充章节定向检索段落
evidence = _collect_evidence(
db,
retrieval,
project_uuid,
section,
top_k=top_k,
required_tables=required_tables,
include_chapter_docs=True,
include_keyword_docs=False,
)
if _is_evidence_sufficient(section, evidence) and not force_l3:
return evidence, "elements_plus_chapter_docs"
# L3: 最后补充关键词兜底检索
evidence = _collect_evidence(
db,
retrieval,
project_uuid,
section,
top_k=top_k,
required_tables=required_tables,
include_chapter_docs=True,
include_keyword_docs=True,
)
return evidence, "elements_plus_chapter_and_keyword_docs"
def _latest_element_payloads_by_row_col(
db: Session,
project_uuid: str,
row_keys: list[str],
*,
non_empty_value: bool = True,
) -> list[dict[str, Any]]:
"""按 ``row_key + col_key`` 去重,保留 ``updated_at`` 最新的一条(查询已按时间倒序)。"""
if not row_keys:
return []
q = db.query(ElementCell).filter(
ElementCell.project_id == project_uuid,
ElementCell.row_key.in_(row_keys),
)
if non_empty_value:
q = q.filter(ElementCell.value.isnot(None), ElementCell.value != "")
cells = q.order_by(ElementCell.updated_at.desc()).all()
picked: dict[tuple[str, str], dict[str, Any]] = {}
for cell in cells:
rk = str(cell.row_key or "")
ck = str(cell.col_key or "")
key = (rk, ck)
if key in picked:
continue
picked[key] = {
"rowKey": cell.row_key,
"colKey": cell.col_key,
"value": str(cell.value or "")[:500],
"sourceDocumentId": cell.source_document_id,
}
return list(picked.values())
def _merge_section_11_forced_elements(
forced_payloads: list[dict[str, Any]],
scored_top_payloads: list[dict[str, Any]],
*,
max_additional_scored: int = 40,
) -> list[dict[str, Any]]:
"""1.1 节:先发制人并入第 1 章概况要素,再追加与其它章节相同的 Top-K 打分单元格(去重)。"""
seen: set[tuple[str, str]] = set()
out: list[dict[str, Any]] = []
for p in forced_payloads:
key = (str(p.get("rowKey") or ""), str(p.get("colKey") or ""))
if key in seen:
continue
seen.add(key)
out.append(p)
added = 0
for p in scored_top_payloads:
key = (str(p.get("rowKey") or ""), str(p.get("colKey") or ""))
if key in seen:
continue
seen.add(key)
out.append(p)
added += 1
if added >= max_additional_scored:
break
return out
def _collect_evidence(
db: Session,
retrieval: RetrievalService,
project_uuid: str,
section: ReportTemplateSection,
*,
top_k: int,
required_tables: Optional[list[str]] = None,
include_chapter_docs: bool = True,
include_keyword_docs: bool = True,
) -> dict:
section_no = _extract_section_number(section.section_title or "")
tokens = _extract_tokens(f"{section.section_title} {section.section_prompt or ''}")[:14]
if section_no == "1.2":
# 标题词过短会导致检索跑偏;补充决策类短语提高召回
extra = " ".join(
[
"项目决策要点 建设必要性 立项背景",
"国VI 国Ⅵ 汽油质量升级 芳烃 烯烃 环保",
"预期目标 烷基化油 产量 辛烷值 万吨",
"可研 批复 投资 效益 利润",
]
)
merged = _extract_tokens(f"{section.section_title} {section.section_prompt or ''} {extra}")
tokens = list(dict.fromkeys(merged))[:20]
cells_query = (
db.query(ElementCell, ElementTable.table_name)
.join(ElementTable, ElementTable.id == ElementCell.table_id)
.filter(
ElementCell.project_id == project_uuid,
ElementTable.project_id == project_uuid,
ElementCell.value.isnot(None),
ElementCell.value != "",
)
.order_by(ElementCell.updated_at.desc())
)
candidate_cells: list[tuple[int, dict]] = []
for cell, table_name in cells_query.limit(800).all():
payload = {
"tableId": cell.table_id,
"tableName": table_name,
"rowKey": cell.row_key,
"colKey": cell.col_key,
"year": cell.year,
"value": str(cell.value or "")[:500],
"sourceDocumentId": cell.source_document_id,
"sourceType": cell.source_type,
}
score = _score_element_cell_relevance(
section.section_title,
tokens,
payload.get("rowKey"),
payload.get("colKey"),
payload.get("value"),
table_name=payload.get("tableName"),
section=section,
)
# 无 token 命中但字段语义强相关时仍保留(例如 value 内存在“项目名称xxx”
if score > 0:
candidate_cells.append((score, payload))
elif not tokens:
candidate_cells.append((1, payload))
candidate_cells.sort(key=lambda x: x[0], reverse=True)
matched_cells = [x[1] for x in candidate_cells[:40]]
# 1.1「项目基本情况」:强制并入「章节要素-第1章项目概况」全部非空格子避免被全局 Top40 相关性截断挤出导致建设投资等待补充。
if _extract_section_number(section.section_title or "") == "1.1":
ch1_rows = section_table_row_keys(CHAPTER1_PROJECT_OVERVIEW_TABLE_GROUP)
forced_ch1 = _latest_element_payloads_by_row_col(db, project_uuid, ch1_rows, non_empty_value=True)
if forced_ch1:
matched_cells = _merge_section_11_forced_elements(forced_ch1, matched_cells, max_additional_scored=40)
required = [str(t) for t in (required_tables or []) if str(t).strip()]
structured_tables = _collect_structured_tables(
db,
project_uuid,
required,
section_title=str(section.section_title or ""),
section_tokens=tokens,
)
chapter_docs = []
if include_chapter_docs:
chapter_docs = retrieval.get_chapter_materials(project_uuid, section.section_title, top_k=top_k)
keyword_docs: list[dict] = []
if include_keyword_docs and tokens:
if section_no == "1.2":
queries = [
"国VI 国Ⅵ 汽油 质量升级 芳烃 烯烃 环保 标准",
"项目 建设 必要性 决策 依据 立项",
"预期 目标 烷基化油 产量 辛烷值 效益 万吨",
" ".join(tokens[:8]),
]
seen: set[tuple[str, str]] = set()
for q in queries:
docs = retrieval.search_by_query(q, top_k=6, filter_project=project_uuid)
for d in docs:
did = str(d.metadata.get("doc_id", "") or "")
body = str(d.page_content or "")[:2000]
key = (did, body[:240])
if key in seen:
continue
seen.add(key)
keyword_docs.append(
{
"heading": d.metadata.get("heading", ""),
"content": body,
"docId": did,
"query": q[:120],
}
)
if len(keyword_docs) >= 14:
break
if len(keyword_docs) >= 14:
break
else:
q = " ".join(tokens[:5])
docs = retrieval.search_by_query(q, top_k=10, filter_project=project_uuid)
for d in docs:
keyword_docs.append(
{
"heading": d.metadata.get("heading", ""),
"content": str(d.page_content or "")[:2000],
"docId": d.metadata.get("doc_id", ""),
}
)
return {
"tokens": tokens,
"requiredTables": required,
"structuredTables": structured_tables,
"canonicalFields": _extract_canonical_fields(
section.section_title, matched_cells, section=section
),
"elements": matched_cells,
"chapterDocs": chapter_docs[:top_k],
"keywordDocs": keyword_docs[:14] if section_no == "1.2" else keyword_docs[:8],
}
def _is_evidence_sufficient(section: ReportTemplateSection, evidence: dict) -> bool:
required_tables = evidence.get("requiredTables") if isinstance(evidence, dict) else []
structured_tables = evidence.get("structuredTables") if isinstance(evidence, dict) else []
elements = evidence.get("elements") if isinstance(evidence, dict) else []
chapter_docs = evidence.get("chapterDocs") if isinstance(evidence, dict) else []
keyword_docs = evidence.get("keywordDocs") if isinstance(evidence, dict) else []
required_count = len(required_tables) if isinstance(required_tables, list) else 0
structured_count = len(structured_tables) if isinstance(structured_tables, list) else 0
element_count = len(elements) if isinstance(elements, list) else 0
chapter_doc_count = len(chapter_docs) if isinstance(chapter_docs, list) else 0
keyword_doc_count = len(keyword_docs) if isinstance(keyword_docs, list) else 0
# 有必需表格时优先保证结构化表匹配覆盖
if required_count > 0 and structured_count < min(required_count, 2):
return False
# 没有足够要素时,需要至少一种文档证据补充
if element_count < 6 and chapter_doc_count == 0 and keyword_doc_count == 0:
return False
# 表格相关章节通常需要更高证据密度
title = str(section.section_title or "")
if "" in title and (structured_count == 0 and element_count < 10):
return False
# 关键章节按字段完整性判定,避免“有数量但没关键字段”时误判为充足
title_norm = re.sub(r"\s+", "", title)
if "1.1项目基本情况" in title_norm:
required_groups = [
["建设单位", "建设单位名称"],
["建设地点", "厂址"],
["建设规模", "装置规模", "能力", "万吨/年"],
["投资", "概算", "估算", "决算"],
]
for group in required_groups:
if not _evidence_contains_any_fact(evidence, group):
return False
if "1.2项目决策要点" in title_norm:
required_groups = [
["国vi", "国ⅵ", "质量升级", "汽油标准", "环保", "环评", "排放", "清洁生产"],
["高标号", "辛烷值", "汽油池", "产品结构", "汽油"],
["碳四", "液化气", "原料", "物料平衡", "资源利用", "附加值"],
["杂质", "预处理", "丁二烯", "选择性加氢", "催化剂", "甲醇", "二甲醚"],
["万吨", "产量", "烷基化油", "效益", "利润", "营业收入", "预期", "目标"],
]
hit_count = 0
for group in required_groups:
if _evidence_contains_any_fact(evidence, group):
hit_count += 1
# 至少命中 2 组:安全评价里常有杂质/物料平衡,可研/环评可补环保与目标
if hit_count < 2:
return False
return True
def _score_element_cell_relevance(
section_title: str,
tokens: list[str],
row_key: Optional[str],
col_key: Optional[str],
value: Optional[str],
*,
table_name: Optional[str] = None,
section: ReportTemplateSection | None = None,
) -> int:
table = str(table_name or "")
row = str(row_key or "")
col = str(col_key or "")
val = str(value or "")
full_text = f"{table} {row} {col} {val}"
full_text_l = full_text.lower()
key_text_l = f"{table} {row} {col}".lower()
score = 0
for t in (tokens or []):
tt = str(t or "").strip()
if not tt:
continue
if tt in full_text:
score += 1
if table and tt in table:
score += 2
title_norm = re.sub(r"\s+", "", str(section_title or ""))
table_norm = re.sub(r"\s+", "", table)
if title_norm and table_norm and (title_norm in table_norm or table_norm in title_norm):
score += 8
section_no = _extract_section_number(section_title)
if section_no and table_norm and section_no.replace(".", ""):
section_no_norm = section_no.replace(".", "")
table_no_norm = re.sub(r"\D", "", table_norm[:12])
if table_no_norm and table_no_norm.startswith(section_no_norm):
score += 3
# 对关键章节字段进行强加权,降低无关单元格被截断前占位的概率。
expected = _section_expected_fields(section_title, section)
for field in expected:
aliases = [str(a).strip() for a in _field_aliases(field) if str(a).strip()]
alias_hit = False
for alias in aliases:
a_l = alias.lower()
if a_l in key_text_l:
score += 4
alias_hit = True
break
if alias_hit:
continue
# 若 row/col 不包含字段名,尝试 value 中“字段:值”模式。
if _extract_value_by_alias_from_text(val, aliases):
score += 5
continue
# 最弱相关value 中仅出现别名关键词。
if any(str(a).lower() in full_text_l for a in aliases):
score += 1
return score
def _evidence_contains_any_fact(evidence: dict, keywords: list[str]) -> bool:
if not isinstance(evidence, dict):
return False
lowered_keywords = [str(k).strip().lower() for k in keywords if str(k).strip()]
if not lowered_keywords:
return False
elements = evidence.get("elements") if isinstance(evidence.get("elements"), list) else []
for row in elements:
if not isinstance(row, dict):
continue
row_key = str(row.get("rowKey") or "").lower()
col_key = str(row.get("colKey") or "").lower()
value = str(row.get("value") or "").strip()
value_l = value.lower()
if _is_missing_like(value):
continue
if any(k in row_key or k in col_key or k in value_l for k in lowered_keywords):
return True
for doc_field in ("chapterDocs", "keywordDocs"):
docs = evidence.get(doc_field) if isinstance(evidence.get(doc_field), list) else []
for d in docs[:12]:
if not isinstance(d, dict):
continue
text = (str(d.get("heading") or "") + " " + str(d.get("content") or "")).lower()
if any(k in text for k in lowered_keywords):
return True
return False
def _recover_stalled_job(db: Session, job: ReportGenerationJob) -> None:
if not job or job.status != "running":
return
now = datetime.now()
running_chapter = (
db.query(ReportGenerationChapter)
.filter(ReportGenerationChapter.job_id == job.id, ReportGenerationChapter.status == "running")
.order_by(ReportGenerationChapter.updated_at.asc())
.first()
)
if not running_chapter or not running_chapter.updated_at:
return
stale_seconds = (now - running_chapter.updated_at).total_seconds()
if stale_seconds < RUNNING_CHAPTER_STALE_SECONDS:
return
running_chapter.status = "pending"
running_chapter.error_message = "检测到章节长时间未更新,已自动回收并重试"
running_chapter.updated_at = now
job.status = "pending"
job.error_message = None
job.current_section_key = None
job.updated_at = now
db.commit()
update_job_state(job.id, status="pending", errorMessage=None, currentSectionKey=None)
update_chapter_state(
job.id,
running_chapter.section_key,
status="pending",
errorMessage="检测到章节长时间未更新,已自动回收并重试",
content=None,
promptText=None,
evidencePayload=None,
validationPayload=None,
)
_start_job_worker(job.id)
def _load_section_reference_for_chapter(
db: Session,
section_key: str,
section_title: str,
*,
template_id: Optional[str] = None,
max_chars: int = 8000,
) -> str:
"""
从 report_section_references 表加载当前章节存储的原始章节内容content
直接用于填充 user-prompt 的 section_reference_block不做 LLM 脱敏。
优先按 section_key 精确匹配,其次从标题中提取编号匹配,最后按标题模糊匹配。
template_id: 选中模板的 ID。传入后只注入与该模板关联report_section_references.template_id
的参考范文,实现“按模板过滤参考范文”;为空则不做模板过滤(取最新一条)。
"""
from services.reference_service import (
load_section_reference_raw,
load_section_reference_raw_by_title,
)
tid = (template_id or "").strip() or None
content = load_section_reference_raw(
db, section_key, template_id=tid, max_chars=max_chars
)
if content:
return content
# 兜底:按标题匹配(仍限定在同一模板内)
return load_section_reference_raw_by_title(
db, section_title, template_id=tid, max_chars=max_chars
)
def _build_chapter_prompt(
section: ReportTemplateSection,
evidence: dict,
*,
prior_sibling_sections_text: str = "",
section_reference: str = "",
) -> str:
selected_example = _select_chapter_example(
section.section_title,
section.examples,
evidence,
)
section_contract = _effective_section_output_contract(section)
section_no = _extract_section_number(section.section_title)
heading_rule = SECTION_HEADING_RULES.get(section_no, DEFAULT_HEADING_RULE)
expected_fields = _section_expected_fields(section.section_title, section)
return build_report_chapter_prompt(
section_title=section.section_title,
section_prompt=_effective_section_prompt_for_generation(section, section_contract),
required_tables_text="".join(evidence.get("requiredTables") or []) or "",
structured_tables_text=_render_structured_tables_for_prompt(evidence),
canonical_fields_text=_render_canonical_fields_for_prompt(
evidence, allowed_fields=expected_fields or None
),
selected_example=selected_example,
heading_rule=heading_rule,
section_contract=section_contract,
evidence_json=json.dumps(evidence, ensure_ascii=False),
prior_sibling_sections_text=prior_sibling_sections_text,
section_reference=section_reference,
)
def _generate_chapter_content(
section: ReportTemplateSection,
prompt: str,
on_content_delta: Optional[callable] = None,
) -> tuple[str, dict, dict]:
section_no = _extract_section_number(section.section_title or "")
logger.info(
"LLM 章节生成 start | section=%s | section_no=%s | max_tokens=%s",
section.section_key, section_no, _chapter_generation_max_tokens(section_no),
)
obj = chat_completions_json(
system_prompt=chapter_generation_system_prompt(),
user_prompt=prompt,
temperature=0.1,
max_tokens=_chapter_generation_max_tokens(section_no),
timeout_sec=120,
on_content_delta=on_content_delta,
log_context=f"章节生成 section_key={section.section_key} | {section.section_title}",
)
content = str(obj.get("content") or "").strip()
if not content:
content = f"{section.section_title}\n\n待补充"
# 不对章节编号/条目序号做“统一编号归一化”改写,避免破坏模板章节层级(如 2.1.1、3.4.2 等)。
# 仅清理证据标签/引用编号等噪声。
content = _strip_inline_evidence_labels(content)
if section_no == "1.2":
content = re.sub(
r"(?m)^[\s\u3000]*1[\s\u3000]*[\)][\s\u3000]*项目背景[\s\u3000]*$",
"1.2.1项目背景",
content,
)
content = re.sub(
r"(?m)^[\s\u3000]*2[\s\u3000]*[\)][\s\u3000]*预期目标[\s\u3000]*$",
"1.2.2预期目标",
content,
)
content = _normalize_section_12_content(content)
missing = obj.get("missingInfo") if isinstance(obj.get("missingInfo"), list) else []
checks = obj.get("qualityChecks") if isinstance(obj.get("qualityChecks"), list) else []
validation = {
"missingInfo": [str(x) for x in missing][:20],
"qualityChecks": [str(x) for x in checks][:20],
"warnings": _basic_warnings(section.section_title, content),
}
return content, validation, obj
def _normalize_ordered_item_markers(content: str) -> str:
text = _strip_inline_evidence_labels(str(content or ""))
if not text:
return text
cn_num_to_idx = {
"": 1,
"": 2,
"": 3,
"": 4,
"": 5,
"": 6,
"": 7,
"": 8,
"": 9,
"": 10,
}
# Keep the first non-empty line unchanged, to avoid mutating the section title.
lines = text.splitlines()
first_non_empty_idx = -1
for i, ln in enumerate(lines):
if ln.strip():
first_non_empty_idx = i
break
# Convert line-leading markers such as:
# - Chinese numerals: "一、" / "(一)"
# - Arabic numerals: "1." / "2." / "1.2." / "3.1"
# into a unified "n" style.
cn_pattern = re.compile(r"^(\s*(?:#+\s*)?(?:[-*]\s*)?)(?:([一二三四五六七八九十])|([一二三四五六七八九十])、)\s*")
ar_pattern = re.compile(r"^(\s*(?:#+\s*)?(?:[-*]\s*)?)(\d+(?:\.\d+)*)(?:\.)?\s+")
def _replace_line(ln: str) -> str:
m_cn = cn_pattern.match(ln)
if m_cn:
prefix = m_cn.group(1) or ""
cn = m_cn.group(2) or m_cn.group(3) or ""
idx = cn_num_to_idx.get(cn)
if idx:
return cn_pattern.sub(f"{prefix}{idx}", ln, count=1)
return ln
m_ar = ar_pattern.match(ln)
if m_ar:
prefix = m_ar.group(1) or ""
seq = m_ar.group(2) or ""
parts = [p for p in seq.split(".") if p]
# Use the last segment as list index: 1.2 -> 2, 3.1 -> 1
idx = parts[-1] if parts else ""
if idx.isdigit():
return ar_pattern.sub(f"{prefix}{int(idx)} ", ln, count=1)
return ln
out: list[str] = []
for i, ln in enumerate(lines):
if i == first_non_empty_idx:
out.append(ln)
continue
out.append(_replace_line(ln))
return "\n".join(out)
def _strip_inline_evidence_labels(text: str) -> str:
src = str(text or "")
if not src:
return src
cleaned = re.sub(r"\s*证据依据\s*[:]\s*[0-9a-fA-F]{16,}\s*】", "", src)
cleaned = re.sub(r"\[\s*证据依据\s*[:]\s*[0-9a-fA-F]{16,}\s*\]", "", cleaned)
# Remove simple inline numeric citations like [1], [2] that often leak from evidence.
cleaned = re.sub(r"\[\s*\d{1,3}\s*\]", "", cleaned)
# Strip leaked meta sections from model JSON fields when they are accidentally merged into content.
cleaned = re.sub(
r"(?is)\n*【\s*缺失信息说明\s*】[\s\S]*?(?=\n【\s*质量检查\s*】|\Z)",
"\n",
cleaned,
)
cleaned = re.sub(r"(?is)\n*【\s*质量检查\s*】[\s\S]*$", "\n", cleaned)
return re.sub(r"[ \t]{2,}", " ", cleaned)
def _markdown_hashes_for_section_no(section_no: str) -> str:
parts = str(section_no or "").strip().split(".")
if len(parts) == 1:
return "##"
if len(parts) == 2:
return "###"
return "####"
def _normalize_numbered_heading_spacing(content: str) -> str:
"""编号与题名之间补空格,便于前后端一致识别为标题。"""
text = str(content or "")
if not text:
return text
def _line_repl(m: re.Match[str]) -> str:
indent, num, title = m.group(1), m.group(2), str(m.group(3) or "").strip()
parts = num.split(".")
if len(parts) < 2 or len(parts) > 4:
return m.group(0)
for part in parts:
if not part.isdigit() or int(part) < 1 or int(part) > 30:
return m.group(0)
if not title or len(title) > 36 or re.search(r"[,。;:!?]", title):
return m.group(0)
return f"{indent}{num} {title}"
return re.sub(
r"(?m)^([\s\u3000]*)(\d+(?:\.\d+)+)\s*([\u4e00-\u9fff][^\n]{0,40})\s*$",
_line_repl,
text,
)
def _normalize_markdown_heading_levels(content: str) -> str:
"""三节及以上编号统一为 ####,避免 ### 与 #### 混用导致同级标题字号不一致。"""
text = str(content or "")
if not text:
return text
def _line_repl(m: re.Match[str]) -> str:
hashes, num, tail = m.group(1), m.group(2), m.group(3)
parts = num.split(".")
if len(parts) < 3:
return m.group(0)
want = _markdown_hashes_for_section_no(num)
if hashes == want:
return m.group(0)
return f"{want} {num} {tail}"
return re.sub(
r"(?m)^(#{1,6})\s+(\d+(?:\.\d+)+)\s+([\u4e00-\u9fff].*)$",
_line_repl,
text,
)
def _normalize_section_heading_markdown(content: str) -> str:
return _normalize_markdown_heading_levels(
_normalize_numbered_heading_spacing(content)
)
def _normalize_section_12_content(content: str) -> str:
"""1.2 合同为纯文本编号体首行「项目决策要点」、小节「1.2.1项目背景」无空格。
与模板/标题验收叠加后会重复节标题,且前端/导出无法识别为 h4。"""
text = str(content or "").strip()
if not text:
return text
text = re.sub(
r"(?m)^[\s\u3000]*项目决策要点[\s\u3000]*(?:[:])?[\s\u3000]*\n+",
"",
text,
count=1,
)
text = re.sub(
r"(?m)^([\s\u3000]*)(1\.2\.[12])(项目背景|预期目标)[\s\u3000]*$",
r"\1\2 \3",
text,
)
return re.sub(r"\n{3,}", "\n\n", text).strip()
_CONTRACT_FIELD_LINE_RE = re.compile(
r"^\s*(\d+)[)]\s*(?P<field>[^:\n]+)[:]\s*(?P<tail>.*)$",
re.MULTILINE,
)
_CONTRACT_FIELD_SKIP_RE = re.compile(
r"必须|不得|禁止|严禁|应|需|写|输出|背景|规则|约束|表\d|后评价|若|当|正文|首行|写作|请",
)
def _parse_expected_fields_from_contract(contract: str | None) -> list[str]:
"""从模版输出合同解析「1) 字段名:...」连续编号字段;无则返回空。"""
text = str(contract or "").strip()
if not text:
return []
fields: list[str] = []
nums: list[int] = []
for m in _CONTRACT_FIELD_LINE_RE.finditer(text):
field = str(m.group("field") or "").strip()
tail = str(m.group("tail") or "").strip()
if not field or len(field) > 10 or _CONTRACT_FIELD_SKIP_RE.search(field):
continue
if tail and not re.fullmatch(r"\.{2,}|待补充", tail) and len(tail) > 6:
continue
fields.append(field)
nums.append(int(m.group(1)))
if len(fields) < 3 or not nums or nums[0] != 1:
return []
for i in range(1, len(nums)):
if nums[i] != nums[i - 1] + 1:
return []
return fields
def _section_expected_fields(
section_title: str,
section: ReportTemplateSection | None = None,
) -> list[str]:
"""从模版输出合同解析应输出字段;无编号/枚举字段时返回空(不再写死 1.1 八项)。"""
if section is not None:
contract = _effective_section_output_contract(section)
else:
contract = _section_output_contract(section_title)
parsed = _parse_expected_fields_from_contract(contract)
if parsed:
return parsed
title_norm = re.sub(r"\s+", "", str(section_title or ""))
if "1.2项目决策要点" in title_norm:
return ["规模目标", "质量目标", "效益目标"]
return []
def _effective_section_prompt_for_generation(
section: ReportTemplateSection,
contract: str,
) -> str:
"""模版合同为结构权威;与合同重复的 section_prompt 不再注入,避免双源冲突。"""
stored = str(section.section_prompt or "").strip()
contract_text = str(contract or "").strip()
if stored and stored != contract_text:
return stored
return ""
def _field_aliases(field: str) -> list[str]:
base = str(field or "").strip()
aliases: dict[str, list[str]] = {
"项目名称": ["项目名称", "工程名称", "装置名称"],
"建设单位": ["建设单位", "业主单位", "实施单位"],
"建设地点": ["建设地点", "建设地址", "厂址", "所在地"],
"建设类型": ["建设类型", "项目类型", "新建", "改扩建"],
"起止时间": ["起止时间", "工作起止时间", "开工时间", "完工时间", "建设工期", "建设期限"],
# 抽取/填表侧常将“建设内容”写作“项目内容/工程内容/装置内容”,需兼容回填。
"建设内容": ["建设内容", "主要建设内容", "建设范围", "项目内容", "工程内容", "装置内容"],
"建设投资": ["建设投资", "总投资", "投资估算", "项目总投资", "概算"],
"占地面积": ["占地面积", "用地面积"],
"规模目标": ["规模目标", "产量", "规模", "万吨"],
"质量目标": ["质量目标", "辛烷值", "质量升级", "国VI", "国Ⅵ"],
"效益目标": ["效益目标", "利润", "收益", "营业收入", "内部收益率", "IRR"],
}
out = aliases.get(base, [])
if base and base not in out:
out.insert(0, base)
return out[:8]
def _is_missing_like(value: str) -> bool:
text = str(value or "").strip()
if not text:
return True
lowered = text.lower()
missing_like = {
"待补充",
"",
"n/a",
"na",
"-",
"",
"——",
"暂无",
"未知",
"未提供",
}
return lowered in missing_like
def _normalize_land_area_value(value: str) -> str:
"""
规范化“占地面积”字段:
- 遇到“84m×187m=15708m2”这类表达时仅保留等号后的结果
- 将 m2/m^2/m²/㎡ 统一为 ㎡,避免导出时出现单位显示异常。
"""
text = str(value or "").strip()
if not text:
return text
core = text
if "=" in core:
core = core.split("=")[-1].strip()
# 中文全角等号兼容
if "" in core:
core = core.split("")[-1].strip()
unified = re.sub(r"(?i)\bm\s*(?:\^?\s*2)\b", "", core)
unified = unified.replace("", "").replace("", "")
unified = re.sub(r"\s*㎡", "", unified)
return unified or text
def _normalize_canonical_field_value(field: str, value: str) -> str:
f = str(field or "").strip()
v = str(value or "").strip()
if not v:
return v
if f == "占地面积":
return _normalize_land_area_value(v)
return v
def _extract_value_by_alias_from_text(text: str, aliases: list[str]) -> str:
src = str(text or "").strip()
if not src:
return ""
for alias in aliases:
a = str(alias or "").strip()
if not a:
continue
# 支持“字段名:值”或“字段名:值”,值截取到常见分隔符前。
pattern = rf"{re.escape(a)}\s*[:]\s*([^\n;,。]+)"
m = re.search(pattern, src, flags=re.IGNORECASE)
if m:
val = str(m.group(1) or "").strip()
if val and not _is_missing_like(val):
return val
return ""
def _is_valid_value_for_field(field: str, value: str, row_key: str = "", col_key: str = "") -> bool:
f = str(field or "").strip()
v = str(value or "").strip()
rk = str(row_key or "").strip().lower()
ck = str(col_key or "").strip().lower()
if not v or _is_missing_like(v):
return False
if f != "建设投资":
return True
# “建设投资”仅接受金额口径,过滤收益率/回收期等财务指标,避免把 2.89 这类比率误填入。
key_text = f"{rk} {ck}"
if any(x in key_text for x in ["收益率", "irr", "回收期", "净现值", "百分点", "利润率"]):
return False
if re.search(r"%||‰", v):
return False
if re.search(r"(收益率|回收期|净现值|利润率|irr)", v, flags=re.IGNORECASE):
return False
has_amount_unit = bool(re.search(r"(万元|万|亿元|亿元人民币|元)", v))
number_match = re.search(r"\d+(?:\.\d+)?", v)
if has_amount_unit:
return True
if not number_match:
return False
# 无单位纯数字时,过小值大概率是比率而非投资金额(如 2.89)。
num = float(number_match.group(0))
return num >= 100
def _extract_canonical_fields(
section_title: str,
elements: list[dict],
*,
section: ReportTemplateSection | None = None,
) -> dict[str, str]:
expected = _section_expected_fields(section_title, section)
if not expected:
return {}
rows = elements if isinstance(elements, list) else []
out: dict[str, str] = {}
for field in expected:
aliases_raw = [str(a).strip() for a in _field_aliases(field) if str(a).strip()]
aliases = [a.lower() for a in aliases_raw]
best_value = ""
for row in rows:
if not isinstance(row, dict):
continue
row_key = str(row.get("rowKey") or "")
col_key = str(row.get("colKey") or "")
value = str(row.get("value") or "").strip()
if _is_missing_like(value):
continue
key_text = f"{row_key} {col_key}".lower()
if any(a in key_text for a in aliases):
if _is_valid_value_for_field(field, value, row_key=row_key, col_key=col_key):
best_value = value
break
# 兼容 row/col 泛化时,直接从 value 文本中解析“字段: 值”。
from_value = _extract_value_by_alias_from_text(value, aliases_raw)
if from_value and _is_valid_value_for_field(field, from_value, row_key=row_key, col_key=col_key):
best_value = from_value
break
normalized = _normalize_canonical_field_value(field, best_value)
out[field] = normalized or "待补充"
return out
def _render_canonical_fields_for_prompt(
evidence: dict,
*,
allowed_fields: list[str] | None = None,
) -> str:
canonical = evidence.get("canonicalFields") if isinstance(evidence, dict) else None
if not isinstance(canonical, dict) or not canonical:
return "无字段级已抽取结果。"
allowed_set = {str(f).strip() for f in (allowed_fields or []) if str(f).strip()}
lines: list[str] = []
for field, value in canonical.items():
f = str(field or "").strip()
if allowed_set and f not in allowed_set:
continue
v = _normalize_canonical_field_value(f, str(value or "").strip()) or "待补充"
if not f:
continue
lines.append(f"- {f}: {v}")
return "\n".join(lines) if lines else "无字段级已抽取结果。"
def _extract_field_value_from_docs(field: str, docs: list[dict]) -> str:
aliases = [str(a).strip() for a in _field_aliases(field) if str(a).strip()]
if not aliases or not isinstance(docs, list):
return ""
texts: list[str] = []
for doc in docs:
if not isinstance(doc, dict):
continue
heading = str(doc.get("heading") or "").strip()
content = str(doc.get("content") or "").strip()
merged = f"{heading}\n{content}".strip()
if merged:
texts.append(merged[:8000])
# 先尝试“字段: 值”类型,命中率高且更稳。
for text in texts:
val = _extract_value_by_alias_from_text(text, aliases)
if val and not _is_missing_like(val):
return _normalize_canonical_field_value(field, val)
# “建设内容”常写成段落而非冒号键值,补充宽松句式抽取。
if field == "建设内容":
for text in texts:
for alias in aliases:
pattern = rf"{re.escape(alias)}\s*(?:为|包括|包含|主要包括)\s*([^\n。]{{12,420}})"
m = re.search(pattern, text, flags=re.IGNORECASE)
if m:
val = str(m.group(1) or "").strip(" :;,")
if val and not _is_missing_like(val):
return _normalize_canonical_field_value(field, val)
return ""
def _merge_canonical_fields_from_docs(
section_title: str,
evidence: dict,
canonical: dict[str, str],
*,
section: ReportTemplateSection | None = None,
) -> dict[str, str]:
expected = _section_expected_fields(section_title, section)
if not expected or not isinstance(evidence, dict):
return canonical
merged = {
str(k): _normalize_canonical_field_value(str(k), str(v))
for k, v in dict(canonical or {}).items()
}
docs: list[dict] = []
chapter_docs = evidence.get("chapterDocs")
keyword_docs = evidence.get("keywordDocs")
if isinstance(chapter_docs, list):
docs.extend(chapter_docs)
# 1.1 项目基本情况:必须优先使用“要素管理-章节要素-第一章项目概况”的表格要素。
# 仅当章节要素表整体为空/极少时,才允许使用 keywordDocs 做跨文档回退匹配,
# 避免将其它章节的“投资/总投资”等金额误回填到 1.1(例如建设投资被污染)。
title_norm = re.sub(r"\s+", "", str(section_title or ""))
allow_keyword_fallback = True
if "1.1项目基本情况" in title_norm:
elements = evidence.get("elements") if isinstance(evidence.get("elements"), list) else []
non_missing_elements = 0
for row in elements[:80]:
if not isinstance(row, dict):
continue
v = str(row.get("value") or "").strip()
if v and not _is_missing_like(v):
non_missing_elements += 1
if non_missing_elements >= 4:
break
# “有一定数量的非空单元格”即认为章节要素不空:禁止 keywordDocs 参与回填。
allow_keyword_fallback = non_missing_elements < 4
if allow_keyword_fallback and isinstance(keyword_docs, list):
docs.extend(keyword_docs)
if not docs:
return merged
for field in expected:
current = str(merged.get(field) or "").strip()
if current and not _is_missing_like(current):
continue
from_docs = _extract_field_value_from_docs(field, docs)
if from_docs and not _is_missing_like(from_docs):
merged[field] = _normalize_canonical_field_value(field, from_docs)
return merged
def _apply_canonical_field_backfill(
section: ReportTemplateSection,
evidence: dict,
content: str,
) -> str:
text = str(content or "")
canonical = evidence.get("canonicalFields") if isinstance(evidence, dict) else {}
if not isinstance(canonical, dict) or not canonical:
elements = evidence.get("elements") if isinstance(evidence, dict) else []
canonical = _extract_canonical_fields(
section.section_title,
elements if isinstance(elements, list) else [],
section=section,
)
canonical = _merge_canonical_fields_from_docs(
section.section_title, evidence, canonical, section=section
)
if not canonical:
return text
repaired = text
for field in _section_expected_fields(section.section_title, section):
value = str(canonical.get(field) or "").strip()
if _is_missing_like(value):
continue
# 先按“字段名: 待补充”进行宽松替换,兼容编号/加粗等格式包装。
broad_pattern = rf"(^.*{re.escape(field)}.*?[:]\s*)待补充(?:\s|$)"
repaired = re.sub(
broad_pattern,
rf"\g<1>{value}\n",
repaired,
flags=re.MULTILINE,
)
labels = list(dict.fromkeys([x for x in _field_aliases(field) if str(x).strip()]))
for label in labels:
pattern = rf"({re.escape(label)}\s*[:]\s*)待补充\b"
repaired = re.sub(pattern, rf"\g<1>{value}", repaired)
# 若正文还没有落入该字段值,则追加一行显式键值,避免模型遗漏。
if value not in repaired and re.search(rf"{re.escape(field)}\s*[:]", repaired):
repaired += f"\n{field}{value}"
return repaired
def _build_field_diagnostics(section: ReportTemplateSection, evidence: dict, content: str) -> list[dict[str, Any]]:
expected = _section_expected_fields(section.section_title, section)
if not expected:
return []
elements = evidence.get("elements") if isinstance(evidence, dict) else []
if not isinstance(elements, list):
elements = []
content_text = str(content or "")
out: list[dict[str, Any]] = []
for field in expected:
aliases_raw = [str(a).strip() for a in _field_aliases(field) if str(a).strip()]
aliases = [a.lower() for a in aliases_raw]
hits: list[str] = []
for row in elements:
if not isinstance(row, dict):
continue
row_key = str(row.get("rowKey") or "")
col_key = str(row.get("colKey") or "")
value = str(row.get("value") or "").strip()
if _is_missing_like(value):
continue
key_text = f"{row_key} {col_key}".lower()
if any(a in key_text for a in aliases):
hits.append(value[:120])
else:
from_value = _extract_value_by_alias_from_text(value, aliases_raw)
if from_value:
hits.append(from_value[:120])
if len(hits) >= 5:
break
unique_hits = list(dict.fromkeys(hits))
content_has_value = any((not _is_missing_like(v)) and v in content_text for v in unique_hits)
content_marked_missing = bool(
re.search(
rf"{re.escape(field)}\s*[:].*?待补充",
content_text,
flags=re.IGNORECASE | re.DOTALL,
)
)
status = "unknown"
if unique_hits and content_has_value:
status = "used"
elif unique_hits and content_marked_missing:
status = "extracted_but_missing_in_content"
elif unique_hits:
status = "extracted_but_not_matched"
elif content_marked_missing:
status = "not_extracted_and_missing"
out.append(
{
"field": field,
"extractedValues": unique_hits,
"contentHasExtractedValue": content_has_value,
"contentMarkedMissing": content_marked_missing,
"status": status,
}
)
return out
def _section_output_contract(section_title: str) -> str:
section_no = _extract_section_number(str(section_title or ""))
if section_no in SECTION_OUTPUT_CONTRACTS:
return SECTION_OUTPUT_CONTRACTS[section_no]
return DEFAULT_SECTION_OUTPUT_CONTRACT
def _effective_section_output_contract(section: ReportTemplateSection) -> str:
raw = getattr(section, "section_output_contract", None)
if isinstance(raw, str) and raw.strip():
return raw.strip()
return _section_output_contract(section.section_title or "")
def _section_requires_tables(section_title: str, *, contract_text: str | None = None) -> bool:
"""判断章节合同是否包含【表格强制要求】,决定该节是否允许出现表格。"""
c = (str(contract_text or "").strip() or _section_output_contract(section_title))
return "表格强制要求" in c
def _strip_tables_from_non_table_section(
section_title: str,
content: str,
*,
section: ReportTemplateSection | None = None,
) -> str:
"""对无表格需求的章节,移除模型可能自行生成的 Markdown 表格。"""
if section is not None:
contract = _effective_section_output_contract(section)
else:
contract = _section_output_contract(section_title)
if "表格强制要求" in contract:
return content
if not content:
return content
lines = content.split("\n")
out: list[str] = []
in_table = False
for line in lines:
stripped = line.strip()
is_table_line = stripped.startswith("|") and stripped.endswith("|")
is_separator = bool(re.match(r"^\|[\s\-:|]+\|$", stripped)) if stripped else False
if is_table_line or is_separator:
if not in_table:
in_table = True
if out and out[-1].strip().startswith("###") and "" in out[-1]:
out.pop()
continue
else:
if in_table:
in_table = False
if stripped.startswith("<!-- 表格来源"):
continue
out.append(line)
return "\n".join(out).strip()
_FORBIDDEN_TABLE_PATTERNS: list[tuple[str, re.Pattern]] = [
(
"2.1.1",
re.compile(
r"(?:^|\n)[^\n]*?表\s*2[\.\s]*6[--—–]\s*1[^\n]*\n"
r"(?:(?:\s*\|[^\n]+\|\s*\n)+|(?:\s*<table>[\s\S]*?</table>))?",
flags=re.IGNORECASE,
),
),
]
def _strip_forbidden_tables(section_title: str, content: str) -> str:
"""移除 2.1.1 中误粘贴的安评类「表2.6-1 + 原料选择加氢/工艺技术对比」整段(不误删「原料数量及组成」模版主表)。"""
if not content:
return content
section_no = _extract_section_number(section_title)
for match_section, pat in _FORBIDDEN_TABLE_PATTERNS:
if section_no == match_section:
content = pat.sub("", content)
return content.strip()
def _extract_section_number(section_title: str) -> str:
m = re.match(r"^\s*(\d+(?:\.\d+)*)", str(section_title or ""))
return m.group(1) if m else ""
def _major_chapter_number(section_no: str) -> str:
no = str(section_no or "").strip()
if not no:
return ""
return no.split(".", 1)[0]
def _build_prior_sibling_sections_text(
section: ReportTemplateSection,
ordered_sections: list[ReportTemplateSection],
completed_contents: dict[str, str],
*,
max_chars_per_section: int = 2500,
max_total_chars: int = 10000,
) -> str:
"""拼接同一大章如第1章中、当前小节之前已生成正文的摘要供后续小节保持日期/金额一致。"""
current_no = _extract_section_number(section.section_title or "")
major = _major_chapter_number(current_no)
if not major or not current_no:
return ""
blocks: list[str] = []
total = 0
for prior in ordered_sections or []:
if prior.section_order >= section.section_order:
break
prior_no = _extract_section_number(prior.section_title or "")
if not prior_no or _major_chapter_number(prior_no) != major:
continue
body = str(completed_contents.get(prior.section_key) or "").strip()
if not body:
continue
snippet = body
if len(snippet) > max_chars_per_section:
snippet = snippet[:max_chars_per_section] + "\n…(下文已截断)"
title = str(prior.section_title or prior_no).strip()
block = f"### {title}\n{snippet}"
if total + len(block) > max_total_chars:
break
blocks.append(block)
total += len(block)
return "\n\n".join(blocks)
def _build_section_title_map(sections: list[ReportTemplateSection]) -> dict[str, str]:
out: dict[str, str] = {}
for section in sections or []:
title = str(section.section_title or "").strip()
section_no = _extract_section_number(title)
if not section_no:
continue
out[section_no] = title
return out
def _resolve_ancestor_titles_for_section(
section: ReportTemplateSection,
chapter_title_map: dict[str, str],
) -> list[str]:
title = str(section.section_title or "").strip()
section_no = _extract_section_number(title)
if not section_no or "." not in section_no:
return []
# 仅同层级第一个小节补父章节;若其父级本身也是上一级中的第一个小节,则继续向上补齐。
if section_no.split(".")[-1] != "1":
return []
ancestors: list[str] = []
current_no = section_no
while "." in current_no:
parent_no = current_no.rsplit(".", 1)[0]
parent_title = str(chapter_title_map.get(parent_no) or "").strip()
if parent_title:
ancestors.append(parent_title)
if parent_no.split(".")[-1] != "1":
break
current_no = parent_no
return list(reversed(ancestors))
def _expected_child_headings(section_no: str) -> list[str]:
n = str(section_no or "").strip()
mapping: dict[str, list[str]] = {
"2": ["2.1", "2.2", "2.3", "2.4", "2.5", "2.6"],
"2.1": ["2.1.1", "2.1.2", "2.1.3", "2.1.4", "2.1.5", "2.1.6", "2.1.7"],
"2.2": ["2.2.1", "2.2.2", "2.2.3", "2.2.4"],
"2.4": ["2.4.1", "2.4.2", "2.4.3", "2.4.4"],
"3": ["3.1", "3.2", "3.3", "3.4", "3.5", "3.6", "3.7", "3.8", "3.9", "3.10", "3.11"],
"3.3": ["3.3.1", "3.3.2", "3.3.3", "3.3.4"],
"3.4": ["3.4.1", "3.4.2"],
"4": ["4.1", "4.2", "4.3", "4.4"],
"4.3": ["4.3.1", "4.3.2", "4.3.3", "4.3.4", "4.3.5", "4.3.6"],
"5": ["5.1", "5.2", "5.3", "5.4", "5.5"],
"5.2": ["5.2.1", "5.2.2", "5.2.3", "5.2.4"],
"5.3": ["5.3.1", "5.3.2"],
"6": ["6.1", "6.2"],
"6.1": ["6.1.1", "6.1.2", "6.1.3", "6.1.4", "6.1.5"],
"6.2": ["6.2.1", "6.2.2", "6.2.3", "6.2.4"],
"7": ["7.1", "7.2", "7.3"],
"7.1": ["7.1.1", "7.1.2"],
}
return mapping.get(n, [])
def _chapter_generation_max_tokens(section_no: str) -> int:
"""
默认输出上限略高于历史 3500避免长段中文在尾部截断后与下一小节标题粘在一段内。
三节编号小节(如 2.4.3)常与多要素叙述叠加,更易触顶。
"""
if not section_no:
return 4096
parts = section_no.split(".")
if len(parts) >= 3:
return 6000
return 4096
def _split_inline_template_headings(content: str, chapter_title_map: dict[str, str]) -> str:
"""
将「正文末尾与下一小节标题挤在同一物理行」的情况拆开(常见于输出触顶截断或模型漏换行)。
仅拆分 chapter_title_map 中存在的三节及以上编号(如 2.4.4),且要求标题后文字与模板标题首字一致,降低误判。
"""
if not content or not chapter_title_map:
return content
valid = {
no
for no in chapter_title_map
if no and no.count(".") >= 2 and re.fullmatch(r"\d+(?:\.\d+)*", no)
}
if not valid:
return content
def split_line_once(line: str) -> tuple[str, bool]:
# 小节编号前一版可能挤在句号/括号后且无空格如「证明了2.4.4 初步……」)。
ms = list(
re.finditer(
r"(?:^|(?<=[\u4e00-\u9fff。;:\)」』\]\.\!\?]))\s*"
r"([1-9]\d{0,2}\.\d{1,2}\.\d{1,3})(?:[ \t\u3000]| )+",
line,
)
)
for m in ms:
num = m.group(1)
if num not in valid:
continue
start = m.start(1)
prefix = line[:start]
if not prefix.strip():
continue
full_title = str(chapter_title_map.get(num) or "").strip()
if not full_title:
continue
tail_m = re.match(rf"^\s*{re.escape(num)}\s+(.+)$", full_title)
name_part = tail_m.group(1).strip() if tail_m else ""
if len(name_part) < 2:
continue
after = line[m.end() : m.end() + min(48, len(name_part) + 8)]
if after and after[0] != name_part[0]:
continue
head = prefix.rstrip()
rest = line[start:].lstrip()
return f"{head}\n\n{rest}", True
return line, False
new_lines: list[str] = []
for raw_line in content.split("\n"):
cur = raw_line
while True:
nxt, changed = split_line_once(cur)
if not changed:
new_lines.append(cur)
break
chunks = nxt.split("\n\n", 1)
new_lines.append(chunks[0])
if len(chunks) > 1:
new_lines.append("")
cur = chunks[1]
else:
cur = ""
return "\n".join(new_lines)
def _split_glued_template_heading_body(content: str, chapter_title_map: dict[str, str]) -> str:
"""
小节标题与正文挤在同一行、中间无换行如「2.4.4 初步设计审查工作评价2017年12月……」
在模板规定的标题尾部与后续正文之间插入空行,便于 Markdown 将标题行与正文分开渲染。
"""
if not content or not chapter_title_map:
return content
valid = sorted(
(
no
for no in chapter_title_map
if no and no.count(".") >= 1 and re.fullmatch(r"\d+(?:\.\d+)*", no)
),
key=len,
reverse=True,
)
if not valid:
return content
def split_one_line(line: str) -> tuple[str, bool]:
for num in valid:
full_title = str(chapter_title_map.get(num) or "").strip()
if not full_title:
continue
tail_m = re.match(rf"^\s*{re.escape(num)}\s+(.+)$", full_title)
if not tail_m:
continue
name_part = tail_m.group(1).strip()
if len(name_part) < 2:
continue
m = re.match(
rf"^\s*(?:#\s*){{0,6}}"
rf"{re.escape(num)}(?:\s+| )+{re.escape(name_part)}",
line,
)
if not m:
continue
rest = line[m.end() :].lstrip(" \t\u3000")
if not rest:
continue
if rest[0] in "\r\n":
continue
if not (rest[0].isdigit() or "\u4e00" <= rest[0] <= "\u9fff"):
continue
return line[: m.end()].rstrip() + "\n\n" + rest, True
return line, False
out_lines: list[str] = []
for raw in content.split("\n"):
cur = raw
while True:
nxt, ok = split_one_line(cur)
if not ok:
out_lines.append(cur)
break
parts = nxt.split("\n\n", 1)
out_lines.append(parts[0])
if len(parts) > 1:
out_lines.append("")
cur = parts[1]
else:
cur = ""
return "\n".join(out_lines)
def _ensure_heading_lines_separated(content: str) -> str:
"""确保编号标题行(如 '2.4.4 初步设计审查工作评价')前后各有空行。"""
if not content:
return content
lines = content.split("\n")
result: list[str] = []
heading_re = re.compile(r"^\s{0,3}#{0,6}\s*\d+(?:\.\d+)+\s+[\u4e00-\u9fff]")
for i, line in enumerate(lines):
stripped = line.strip()
if stripped and heading_re.match(stripped):
if result and result[-1].strip() != "":
result.append("")
result.append(line)
if i + 1 < len(lines) and lines[i + 1].strip() != "":
result.append("")
else:
result.append(line)
return "\n".join(result)
def _auto_append_missing_child_headings(content: str, section_no: str) -> tuple[str, list[str]]:
expected = _expected_child_headings(section_no)
if not expected:
return content, []
text = str(content or "").rstrip()
missing: list[str] = []
for heading_no in expected:
# 兼容 "# 3.1 xxx" / "3.1 xxx" / "### 3.1.1 xxx" 等写法
if re.search(rf"(?m)^\s{{0,3}}#{0,6}\s*{re.escape(heading_no)}(?:\s|$)", text):
continue
missing.append(heading_no)
if not missing:
return text, []
blocks = [text] if text else []
for heading_no in missing:
blocks.append(missing_child_heading_markdown(heading_no))
repaired = "".join(blocks).strip()
return repaired, missing
def _restore_required_tables_safety_net(
content: str,
required_tables: list[str],
evidence: dict,
pre_postprocess_content: str,
) -> str:
"""后处理安全网:若后处理流水线意外删除了必需表,从预存快照中恢复。
对每个 required token若当前 content 中已不存在但 pre_postprocess_content
中存在,则从 pre_postprocess_content 中提取该表块(表题行 + 管道行),
在当前 content 中找到合适位置插回。
"""
if not required_tables:
return content
text = str(content or "")
pre = str(pre_postprocess_content or "")
if not text or not pre:
return content
for token in required_tables:
if _table_token_exists(text, token):
continue
auth_block = _authoritative_block_for_required_table(token, evidence)
if auth_block and _table_token_caption_line_re(token).search(text):
text = _replace_caption_stub_with_authoritative_table(text, token, auth_block)
continue
if not _table_token_exists(pre, token):
continue
table_block = _extract_table_block_from_content(pre, token)
if not table_block:
continue
insert_pos = _find_table_insert_position(text, token, required_tables)
if insert_pos is not None:
text = text[:insert_pos] + "\n\n" + table_block + "\n\n" + text[insert_pos:]
else:
text = text.rstrip() + "\n\n" + table_block
return re.sub(r"\n{3,}", "\n\n", text).strip()
def _extract_table_block_from_content(content: str, token: str) -> str:
"""从内容中提取 token 对应的表块(表题行 + 可选注释行 + 管道行)。"""
token_plain = re.sub(r"\s+", "", str(token or ""))
if not token_plain:
return ""
token_re = re.escape(token_plain).replace(r"\-", r"[--—–]")
pat = re.compile(
r"((?:^|\n)[^\n]*?" + token_re + r"[^\n]*\n"
r"(?:\n|[ \t]*<!--[^\n]*-->[ \t]*\n)*"
r"(?:[ \t]*\|[^\n]*\|[ \t]*\n)+)",
flags=re.IGNORECASE,
)
m = pat.search(content)
return m.group(0).strip() if m else ""
def _find_table_insert_position(content: str, token: str, required_tables: list[str]) -> int | None:
"""在 content 中找到 token 对应表应插入的位置。
规则:插入到下一个必需表的表题行之前;若没有后续表,返回 None追加到末尾
"""
token_idx = None
for i, t in enumerate(required_tables):
if _norm_table_token(t) == _norm_table_token(token):
token_idx = i
break
if token_idx is None:
return None
for later_token in required_tables[token_idx + 1:]:
later_plain = re.sub(r"\s+", "", str(later_token or ""))
if not later_plain:
continue
later_re = re.escape(later_plain).replace(r"\-", r"[--—–]")
later_pat = re.compile(
r"(?:^|\n)([^\n]*?" + later_re + r"[^\n]*)\n",
flags=re.IGNORECASE,
)
m = later_pat.search(content)
if m:
pos = m.start()
if pos > 0 and content[pos] == "\n":
pos += 1
return pos
return None
def _enforce_required_tables(
section: ReportTemplateSection,
prompt: str,
content: str,
evidence: dict,
) -> tuple[str, list[str]]:
required = _extract_required_table_tokens(
section.section_prompt or "",
_extract_section_number(section.section_title or ""),
contract_text=_effective_section_output_contract(section),
)
if not required:
return content, []
# 模板必需表优先“要素表直出”,避免模型改写结构化表中的真实数据。
repaired = _append_authoritative_required_tables(content, required, evidence)
missing = [t for t in required if not _table_token_exists(repaired, t)]
if missing:
repaired = _append_structured_missing_tables(repaired, missing, evidence)
still_missing = [t for t in required if not _table_token_exists(repaired, t)]
if still_missing:
repaired = _repair_missing_tables(section, prompt, repaired, still_missing, evidence)
still_missing = [t for t in required if not _table_token_exists(repaired, t)]
if still_missing:
repaired = _append_minimal_missing_tables(repaired, still_missing)
# 章节间串表清理4.3.2 仅保留运行周期统计表4.3.3 仅保留装置运行分析表。
repaired = _remove_cross_section_table_pollution(section.section_title or "", repaired)
# 末尾兜底:若必需表已“存在”但表体残缺(仅分隔行/缺数据行),
# 仍要强制回填要素管理中的完整结构化表。
repaired = _ensure_required_structured_tables_integrity(repaired, required, evidence)
# 即使 missing 为空(如 5.1 已由 LLM 写出表5-1仍须去重避免 LLM 表 + 要素直出表并存。
repaired = _finalize_section_table_dedupe(repaired, required)
repaired = _fill_required_table_caption_stubs(repaired, required, evidence)
repaired = _finalize_section_table_dedupe(repaired, required)
final_missing = [t for t in required if not _table_token_exists(repaired, t)]
return repaired, final_missing
def _extract_required_table_tokens(
section_prompt: str,
section_no: str = "",
*,
contract_text: Optional[str] = None,
) -> list[str]:
"""
从模板 section_prompt 与章节输出合同section_output_contracts中抽取「表 x-x / 附表 x」
使合同内写死的「见表2-3」等也能触发 _append_authoritative_required_tables 要素直出。
严格规则:
- 仅当合同中包含「【表格强制要求】」标签时才提取正文表表x-x
- 「见附表N」「附表N~附表M」等仅为引用语不视为本节必需内嵌的表格含区间端点及中间附表
- 附图与附表在正文之后由 _append_report_appendices 统一汇总(附图在上、附表在下)。
"""
parts = [str(section_prompt or "").strip(), str(contract_text or "").strip()]
text = "\n".join(p for p in parts if p)
if not text:
return []
has_table_mandate = "表格强制要求" in text
if not has_table_mandate:
return []
# 剔除「【禁止】」段落,避免将禁止示例中的表号(如"表2.6-1")误判为必需表。
text_for_extraction = re.sub(
r"【禁止】.*?(?=【|$)", "", text, flags=re.DOTALL,
)
raw = re.findall(
r"(附表\s*\d+(?:\s*[.\-]\s*\d+)*|表\s*\d+(?:\s*[.\-]\s*\d+)*)",
text_for_extraction,
)
out: list[str] = []
seen = set()
chapter_no = ""
m_sec = re.match(r"^\s*(\d+)", str(section_no or ""))
if m_sec:
chapter_no = m_sec.group(1)
_REF_ONLY_PATTERN = re.compile(
r"(?:见|详见|参见|参照|详)\s*附表\s*\d+",
)
ref_only_appendices: set[str] = set()
for m in _REF_ONLY_PATTERN.finditer(text):
tok_in_ref = re.findall(r"(附表\s*\d+(?:\s*[.\-]\s*\d+)*)", m.group())
for t in tok_in_ref:
ref_only_appendices.add(re.sub(r"\s+", "", t))
for tok in raw:
norm = re.sub(r"\s+", "", tok)
if norm.startswith("附表") and norm in ref_only_appendices:
continue
if chapter_no:
m_tok = re.match(r"^(?:附表|表)\s*(\d+)", norm)
if m_tok:
tail = norm[m_tok.end() :]
# 「表1」「表2」等为节内顺序号首位数字不等于章号如第二章下的表1仅对「表2-3」「表2.6-1」等带子级编号的表号按章首数字过滤。
if tail and tail[0] in ".--—–":
if m_tok.group(1) != chapter_no:
continue
if norm in seen:
continue
seen.add(norm)
out.append(norm)
# 5.3.2 合同正文仅允许表5-5、表5-6模板示例里若夹带「附表8」等一律不纳入必需表避免要素直出串表。
if str(section_no or "").strip() == "5.3.2":
allow_532 = {_norm_table_token("表5-5"), _norm_table_token("表5-6")}
out = [t for t in out if _norm_table_token(t) in allow_532]
# 7.1.2 仅内嵌表7-1合同/模板中若夹带其他章表号,不纳入本节必需表。
if str(section_no or "").strip() == "7.1.2":
allow_712 = {_norm_table_token("表7-1")}
out = [t for t in out if _norm_table_token(t) in allow_712]
return out[:20]
def _repair_missing_tables(
section: ReportTemplateSection,
prompt: str,
content: str,
missing_tables: list[str],
evidence: dict,
) -> str:
fix_prompt = build_repair_missing_tables_prompt(
section_title=section.section_title,
original_prompt=prompt,
content=content,
missing_tables=missing_tables,
evidence_json=json.dumps(evidence, ensure_ascii=False),
)
obj = chat_completions_json(
system_prompt=repair_missing_tables_system_prompt(),
user_prompt=fix_prompt,
temperature=0.1,
max_tokens=2200,
timeout_sec=120,
log_context=f"补缺失表格 section_key={section.section_key} | {section.section_title}",
)
new_content = str(obj.get("content") or "").strip()
return new_content or content
def _append_minimal_missing_tables(content: str, missing_tables: list[str]) -> str:
blocks = [content.rstrip()]
for t in missing_tables:
blocks.append(
MINIMAL_MISSING_TABLE_TEMPLATE.format(
table_name=_normalize_table_caption_number_name_gap(str(t or "").strip())
)
)
return "".join(blocks).strip()
def _remove_cross_section_table_pollution(section_title: str, content: str) -> str:
"""
清理 4.3.2 / 4.3.3 的跨节串表:
- 4.3.2 不允许出现“装置运行分析”表
- 4.3.3 不允许出现“投产以来运行周期统计表”
"""
text = str(content or "")
section_no = _extract_section_number(section_title)
if section_no not in {"4.3.2", "4.3.3"}:
return text
if section_no == "4.3.2":
forbidden_kw = "装置运行分析"
else:
forbidden_kw = "投产以来运行周期统计表"
# 表题行 + Markdown 表格(允许表题与表格之间有空行/注释行)
md_pat = re.compile(
rf"(?:^|\n)[^\n]*{re.escape(forbidden_kw)}[^\n]*\n"
rf"(?:\s*\n|<!--[\s\S]*?-->\s*\n)*"
rf"(?:\s*\|[^\n]+\|\s*\n)+",
flags=re.IGNORECASE,
)
text = md_pat.sub("\n", text)
# 表题行 + HTML 表格(允许表题与表格之间有空行/注释行)
html_pat = re.compile(
rf"(?:^|\n)[^\n]*{re.escape(forbidden_kw)}[^\n]*\n"
rf"(?:\s*\n|<!--[\s\S]*?-->\s*\n)*"
rf"\s*<table>[\s\S]*?</table>",
flags=re.IGNORECASE,
)
text = html_pat.sub("\n", text)
# 残留单独表题行(无表体)也移除,避免视觉噪音
title_only_pat = re.compile(
rf"(?:^|\n)\s*[#>*\-\d\.\)\s]*[^\n]*{re.escape(forbidden_kw)}[^\n]*(?=\n|$)",
flags=re.IGNORECASE,
)
text = title_only_pat.sub("\n", text)
# 折叠多余空行
text = re.sub(r"\n{3,}", "\n\n", text)
return text.strip()
def _title_compare_norm(s: str) -> str:
"""标题宽松比较:折叠空白,并去掉中英括号两侧多余空格。"""
t = re.sub(r"\s+", " ", str(s or "")).strip()
t = re.sub(r"\s*([(])\s*", r"\1", t)
t = re.sub(r"\s*([)])\s*", r"\1", t)
return t
def _heading_line_section_number(line: str) -> str:
normalized = str(line or "").strip().lstrip("#").strip()
m = re.match(r"^(\d+(?:\.\d+)*)", normalized)
return m.group(1) if m else ""
def _is_heading_line_for_section(line: str, section_no: str) -> bool:
if not section_no:
return False
return _heading_line_section_number(line) == section_no
def _strip_leading_section_heading_lines(lines: list[str], section_no: str) -> list[str]:
"""去掉正文开头连续的、与 section_no 同编号的标题行(避免 prepend 后重复)。"""
trimmed = list(lines)
while trimmed:
first = trimmed[0]
if not str(first).strip():
trimmed.pop(0)
continue
if _is_heading_line_for_section(first, section_no):
trimmed.pop(0)
while trimmed and not str(trimmed[0]).strip():
trimmed.pop(0)
continue
break
return trimmed
def _replace_first_section_heading_line(content: str, section_no: str, canonical_title: str) -> str:
lines_list = content.splitlines()
for idx_l, ln in enumerate(lines_list):
if not ln.strip():
continue
if not _is_heading_line_for_section(ln, section_no):
break
stripped = ln.strip()
section_no_heading = section_no
plain_numbered_24x = section_no_heading in {
"2.4.1",
"2.4.2",
"2.4.3",
"2.4.4",
}
if stripped.startswith("#") and not plain_numbered_24x:
hm = re.match(r"^(#+\s*)", stripped)
prefix = hm.group(1) if hm else ""
lines_list[idx_l] = (prefix + canonical_title).rstrip()
else:
lines_list[idx_l] = canonical_title
break
return "\n".join(lines_list)
def _enforce_template_format_contract(
section: ReportTemplateSection,
content: str,
evidence: dict,
*,
chapter_title_map: Optional[dict[str, str]] = None,
) -> tuple[str, list[str]]:
issues: list[str] = []
# 不对编号样式做统一归一化,避免破坏章节层级编号(如 1.2.1 / 2.1.3 / 3.4.2)。
repaired = _strip_inline_evidence_labels(str(content or "").strip())
repaired = _normalize_section_heading_markdown(repaired)
if _extract_section_number(str(section.section_title or "")) == "1.2":
repaired = _normalize_section_12_content(repaired)
# 0) 先拆行再做标题验收,否则步骤 1 会因首行 != 标准标题而重复插入标题。
# 0.1) 上一段正文末尾与下一小节编号粘在同一行如「……证明了2.4.4 初步……」)。
repaired = _split_inline_template_headings(repaired, chapter_title_map or {})
# 0.2) 小节标题后与正文首字粘在同行如「2.4.4 初步设计审查工作评价2017年……」
repaired = _split_glued_template_heading_body(repaired, chapter_title_map or {})
# 1) 标题验收:
# - 普通节:首行为当前节标题(允许附加 # 前缀)
# - 每章第一节x.1):首行为章标题,且必须包含当前节标题
title = str(section.section_title or "").strip()
section_no_heading = _extract_section_number(title)
title_norm = _title_compare_norm(title)
non_empty_lines: list[str] = []
for line in repaired.splitlines():
if line.strip():
non_empty_lines.append(line.strip().lstrip("#").strip())
first_non_empty = non_empty_lines[0] if non_empty_lines else ""
first_matches_title = bool(
title and first_non_empty and _title_compare_norm(first_non_empty) == title_norm
)
first_is_section_heading = bool(
title
and section_no_heading
and first_non_empty
and _is_heading_line_for_section(first_non_empty, section_no_heading)
)
ancestor_titles = _resolve_ancestor_titles_for_section(
section,
chapter_title_map or {},
)
if ancestor_titles:
required_titles = ancestor_titles + ([title] if title else [])
required_norms = [_title_compare_norm(t) for t in required_titles]
existing_lines = repaired.splitlines()
trimmed_lines = list(existing_lines)
existing_title_chain: list[str] = []
while trimmed_lines:
first_line = trimmed_lines[0]
normalized = _title_compare_norm(first_line.strip().lstrip("#").strip())
if not normalized:
trimmed_lines.pop(0)
continue
if normalized in required_norms:
existing_title_chain.append(normalized)
trimmed_lines.pop(0)
while trimmed_lines and not trimmed_lines[0].strip():
trimmed_lines.pop(0)
continue
if (
title
and section_no_heading
and len(existing_title_chain) == len(required_norms) - 1
and _is_heading_line_for_section(first_line, section_no_heading)
):
existing_title_chain.append(required_norms[-1])
trimmed_lines.pop(0)
while trimmed_lines and not trimmed_lines[0].strip():
trimmed_lines.pop(0)
continue
break
if existing_title_chain != required_norms:
body_lines = _strip_leading_section_heading_lines(trimmed_lines, section_no_heading)
body = "\n".join(body_lines).strip()
repaired = "\n\n".join(required_titles + ([body] if body else [])).strip()
issues.append("章节缺少父级标题链,已自动补齐")
elif title and first_is_section_heading and first_non_empty != title:
repaired = _replace_first_section_heading_line(repaired, section_no_heading, title)
elif title and not first_matches_title and first_is_section_heading:
repaired = _replace_first_section_heading_line(repaired, section_no_heading, title)
if first_non_empty != title:
issues.append("章节标题与模板不一致,已规范为标准标题行")
elif title and not first_matches_title:
repaired = f"{title}\n\n{repaired}".strip()
issues.append("章节标题与模板不一致,已自动补齐标准标题行")
elif title and first_matches_title and first_non_empty != title:
repaired = _replace_first_section_heading_line(repaired, section_no_heading, title)
# 1.42) 2.4.12.4.4:首行可能是「### 2.4.x …」且去 # 后与模板标题一致,此时不会进入上一分支,须去掉 Markdown 前缀。
if (
_extract_section_number(title) in {"2.4.1", "2.4.2", "2.4.3", "2.4.4"}
and title
):
lns_strip = repaired.splitlines()
for _is, ln_s in enumerate(lns_strip):
if not ln_s.strip():
continue
sh = ln_s.strip()
if sh.startswith("#") and _title_compare_norm(sh.lstrip("#").strip()) == title_norm:
lns_strip[_is] = title
break
repaired = "\n".join(lns_strip)
# 1.5) 确保标题行(如 "2.4.4 初步设计审查工作评价")后面有空行,
# 否则前端 Markdown 渲染或 DOCX 导出时可能无法识别为标题。
repaired = _ensure_heading_lines_separated(repaired)
# 2) 标题树验收:若该节定义了固定子节顺序,缺失则自动补齐占位小节。
section_no = _extract_section_number(title)
repaired, missing_children = _auto_append_missing_child_headings(repaired, section_no)
if missing_children:
issues.append("缺失下级小节已自动补齐:" + "".join(missing_children[:10]))
# 3) 仅当章节合同显式要求表格时,才做模板表格规格验收与修复。
# 否则像 5.2.2/5.2.3/5.2.4 这类纯文字章节会被示例表误触发补表,产生脏表格。
if not _section_requires_tables(
title, contract_text=_effective_section_output_contract(section)
):
return _strip_inline_evidence_labels(repaired), issues
# 4) 解析模板示例中的表规格(表名 + 表头关键字)
table_specs = _extract_template_table_specs(section.examples)
if not table_specs:
return repaired, issues
# 4.1) 第 5 章共用示例里同时出现「表5-1/表5-2」宁夏样例与各小节真实合同如 5.3.1 仅表5-4
# 若不按合同过滤_find_table_format_issues 会误报缺表5-1_repair_table_format_by_template 会把表5-4「修」成样例表头。
contract_required = _extract_required_table_tokens(
section.section_prompt or "",
section_no,
contract_text=_effective_section_output_contract(section),
)
if contract_required:
allow = {_norm_table_token(t) for t in contract_required if _norm_table_token(t)}
narrowed = [
s
for s in table_specs
if _norm_table_token(str(s.get("token") or "")) in allow
]
if narrowed:
table_specs = narrowed
table_issues = _find_table_format_issues(repaired, table_specs)
if table_issues:
issues.extend(table_issues)
repaired = _repair_table_format_by_template(section, repaired, table_specs, evidence)
# 二次验收,仍不通过则提示但不循环重试
still = _find_table_format_issues(repaired, table_specs)
if still:
issues.extend([f"二次修正后仍存在:{x}" for x in still[:4]])
return _strip_inline_evidence_labels(repaired), issues
def _extract_template_table_specs(raw_examples: Optional[str]) -> list[dict]:
text = str(raw_examples or "").strip()
if not text:
return []
lines = [ln.rstrip() for ln in text.splitlines()]
specs: list[dict] = []
i = 0
while i < len(lines):
line = lines[i].strip()
m = re.match(r"^(附表\s*\d+|表\s*\d+(?:\s*-\s*\d+)?)\s*(.*)$", line)
if not m:
i += 1
continue
token = re.sub(r"\s+", "", m.group(1))
title_tail = str(m.group(2) or "").strip()
title = f"{m.group(1)} {title_tail}".strip()
j = i + 1
header_keywords: list[str] = []
# 采集该表后面的头部字段线索
while j < len(lines):
cur = lines[j].strip()
if not cur:
j += 1
if header_keywords:
break
continue
if re.match(r"^(附表\s*\d+|表\s*\d+(?:\s*-\s*\d+)?)\s*", cur):
break
if re.match(r"^\d+(?:\.\d+)*\s+", cur): # 下一个章节
break
if cur.startswith(""):
break
if "|" in cur:
# Markdown 表头
cells = [c.strip() for c in cur.split("|") if c.strip()]
for c in cells[:8]:
if c and c not in ("---", ""):
header_keywords.append(c)
break
# 普通文本表头行
if len(cur) <= 24 and not re.fullmatch(r"[0-9.%()\-~:/\s]+", cur):
header_keywords.append(cur)
if len(header_keywords) >= 8:
break
j += 1
specs.append(
{
"token": token,
"title": title,
"headerKeywords": list(dict.fromkeys(header_keywords))[:8],
}
)
i = j
# 去重同 token
dedup: dict[str, dict] = {}
for s in specs:
tk = str(s.get("token") or "")
if not tk or tk in dedup:
continue
dedup[tk] = s
return list(dedup.values())[:12]
def _find_table_format_issues(content: str, table_specs: list[dict]) -> list[str]:
issues: list[str] = []
c = str(content or "")
c_norm = _norm_table_token(c)
for spec in table_specs:
token = str(spec.get("token") or "")
title = str(spec.get("title") or token)
token_norm = _norm_table_token(token)
if token_norm and token_norm not in c_norm:
issues.append(f"缺少模板表名:{title}")
continue
# 若模板存在表头关键词则要求至少命中2个或全部如果少于2
headers = [str(h).strip() for h in (spec.get("headerKeywords") or []) if str(h).strip()]
if not headers:
continue
hit = sum(1 for h in headers if h in c)
need = min(2, len(headers))
if hit < need:
issues.append(f"表头与模板不一致:{title}")
return issues
def _repair_table_format_by_template(
section: ReportTemplateSection,
content: str,
table_specs: list[dict],
evidence: dict,
) -> str:
specs_text = json.dumps(table_specs, ensure_ascii=False)
fix_prompt = build_table_format_repair_prompt(
section_title=section.section_title,
table_specs_json=specs_text,
content=content,
evidence_json=json.dumps(evidence, ensure_ascii=False),
)
obj = chat_completions_json(
system_prompt=table_format_repair_system_prompt(),
user_prompt=fix_prompt,
temperature=0.1,
max_tokens=2600,
timeout_sec=120,
log_context=f"表格格式修复 section_key={section.section_key} | {section.section_title}",
)
fixed = str(obj.get("content") or "").strip()
return fixed or content
def _render_structured_tables_for_prompt(evidence: dict) -> str:
rows = evidence.get("structuredTables") if isinstance(evidence, dict) else []
if not isinstance(rows, list) or not rows:
return "无结构化表格证据"
blocks: list[str] = []
for row in rows[:8]:
if not isinstance(row, dict):
continue
name = str(row.get("tableName") or "").strip()
md = str(row.get("markdown") or "").strip()
if not name or not md:
continue
blocks.append(f"### {_normalize_table_caption_number_name_gap(name)}\n\n{md}")
return "\n\n".join(blocks) if blocks else "无结构化表格证据"
def _strip_bracketed_three_part_labels(content: str) -> str:
text = str(content or "")
if not text:
return text
# 全章统一移除方括号三段式标题,保留其后正文内容。
patterns = [
r"^\s{0,3}#{0,6}\s*【事实依据】\s*$",
r"^\s{0,3}#{0,6}\s*【评价判断】\s*$",
r"^\s{0,3}#{0,6}\s*【问题与建议】\s*$",
r"^\s{0,3}#{0,6}\s*事实依据\s*[:、]?\s*$",
r"^\s{0,3}#{0,6}\s*评价判断\s*[:、]?\s*$",
r"^\s{0,3}#{0,6}\s*问题与建议\s*[:、]?\s*$",
r"【事实依据】",
r"【评价判断】",
r"【问题与建议】",
]
for p in patterns:
text = re.sub(p, "", text, flags=re.MULTILINE)
text = re.sub(r"\n{3,}", "\n\n", text).strip()
return text
def _strip_placeholder_table_notes(content: str) -> str:
text = str(content or "")
if not text:
return text
placeholder_note_pattern = re.compile(
r"^\s{0,3}(?:[-*+]\s*)?(?:>\s*)?(?:\*\*|__)?\s*注\s*\d+\s*[\.。::、]?\s*待补充\s*(?:\*\*|__)?\s*$",
flags=re.IGNORECASE,
)
boilerplate_note_line_pattern = re.compile(
r"可酌情增减指标|可酌情增减|根据项目的情况|根据项目实际需要进行增减|根据项目不同进行增减|根据项目具体情况增减|表中内容可根据",
flags=re.IGNORECASE,
)
boilerplate_full_line_pattern = re.compile(
r"^\s{0,3}(?:[-*+]\s*)?(?:>\s*)?(?:\*\*|__)?\s*注\s*[\.。:]?\s*(?:\d+\s*[\.。、:]?\s*)?(?:表中内容)?可根据项目.{0,20}(?:增减|调整)",
flags=re.IGNORECASE,
)
note_header_pattern = re.compile(
r"^\s*(?:[-*+]\s*)?(?:>\s*)?(?:\*\*|__)?\s*注\s*[:]\s*(?:\*\*|__)?\s*$",
flags=re.IGNORECASE,
)
src_lines = text.splitlines()
lines: list[str] = []
i = 0
while i < len(src_lines):
line = src_lines[i]
if placeholder_note_pattern.match(line):
i += 1
continue
if boilerplate_full_line_pattern.match(line):
i += 1
continue
if boilerplate_note_line_pattern.search(line):
i += 1
continue
if note_header_pattern.match(line):
j = i + 1
skipped_boilerplate = False
while j < len(src_lines):
nxt = src_lines[j]
if not str(nxt).strip(" \t\u3000"):
j += 1
continue
if boilerplate_note_line_pattern.search(nxt):
skipped_boilerplate = True
j += 1
break
if skipped_boilerplate:
i = j
continue
lines.append(line)
i += 1
return re.sub(r"\n{3,}", "\n\n", "\n".join(lines)).strip()
def _strip_trailing_partial_missing_markers(content: str) -> str:
text = str(content or "")
if not text:
return text
cleaned_lines: list[str] = []
for raw_line in text.splitlines():
line = raw_line.rstrip()
compact = re.sub(r"\s+", "", line)
if compact in {"待补充", "-待补充", "*待补充"}:
cleaned_lines.append(line)
continue
updated = re.sub(r"\s*待补充\s*(?:\[\s*\d{1,3}\s*\])?\s*$", "", line)
updated = re.sub(r"\s{2,}", " ", updated).rstrip()
stripped = updated.strip()
core_len = len(re.sub(r"[^\u4e00-\u9fffA-Za-z0-9]", "", stripped))
if stripped and stripped != line.strip() and core_len >= 12:
cleaned_lines.append(updated)
else:
cleaned_lines.append(line)
return re.sub(r"\n{3,}", "\n\n", "\n".join(cleaned_lines)).strip()
def _is_pipe_markdown_table_row_line(line: str) -> bool:
s = line.strip()
return len(s) >= 2 and s.startswith("|") and s.endswith("|")
def _is_pipe_markdown_table_separator_line(line: str) -> bool:
s = line.strip()
return bool(re.match(r"^\|[\s\-:|]+\|$", s)) if s else False
def _markdown_table_body_fingerprint(md: str) -> str:
"""用于判断两张 Markdown 表是否实质相同(忽略行间空行与首尾空白)。"""
lines = [
re.sub(r"\s+", "", ln.strip())
for ln in str(md or "").splitlines()
if ln.strip().startswith("|") or ln.strip().startswith("")
]
return "\n".join(lines)
_INTRA_REPEAT_MIN_FRAGMENT_LEN = 18
_INTRA_REPEAT_MIN_CONSECUTIVE_COUNT = 3
def _collapse_consecutive_repetitions_in_string(text: str) -> tuple[str, int]:
"""
折叠同一行/字符串内连续重复片段(如模型将同一句财务描述拼接数十次)。
仅处理紧邻重复,避免误伤正常文中偶然出现的相同短语。
"""
s = str(text or "")
min_len = _INTRA_REPEAT_MIN_FRAGMENT_LEN
min_count = _INTRA_REPEAT_MIN_CONSECUTIVE_COUNT
if len(s) < min_len * min_count:
return s, 0
removed = 0
out: list[str] = []
i = 0
n = len(s)
while i < n:
best_plen = 0
best_count = 0
max_plen = (n - i) // min_count
for plen in range(min_len, max_plen + 1):
pat = s[i : i + plen]
if not pat.strip():
continue
count = 1
j = i + plen
while j + plen <= n and s[j : j + plen] == pat:
count += 1
j += plen
if count >= min_count:
span = plen * count
if span > best_plen * best_count:
best_plen = plen
best_count = count
if best_plen:
out.append(s[i : i + best_plen])
removed += best_count - 1
i += best_plen * best_count
else:
out.append(s[i])
i += 1
return "".join(out), removed
def _collapse_consecutive_text_repetitions(content: str) -> tuple[str, int]:
"""按行折叠段内连续重复;返回 (正文, 移除的重复次数)。"""
lines = str(content or "").splitlines()
if not lines:
return str(content or ""), 0
total_removed = 0
collapsed_lines: list[str] = []
for line in lines:
collapsed, removed = _collapse_consecutive_repetitions_in_string(line)
total_removed += removed
collapsed_lines.append(collapsed)
return "\n".join(collapsed_lines), total_removed
_CHAPTER_CONTENT_DEDUPE_MIN_CHARS = 2000
_CHAPTER_DEDUPE_MIN_BLOCK_CORE_LEN = 48
_CHAPTER_DEDUPE_NEAR_MATCH_RATIO = 0.90
def _chapter_block_core_len(text: str) -> int:
return len(re.sub(r"[^\u4e00-\u9fffA-Za-z0-9]", "", _strip_inline_evidence_labels(str(text or ""))))
def _chapter_text_block_fingerprint(text: str) -> str:
t = _strip_inline_evidence_labels(str(text or ""))
t = re.sub(r"\s+", "", t)
return t.casefold()
def _is_likely_table_caption_line(line: str) -> bool:
prev = str(line or "").strip()
if not prev or len(prev) > 120:
return False
if prev.startswith("#"):
return True
if re.search(r"\s*[\d一二三四五六七八九十\--—–]+", prev):
return True
return "" in prev and len(prev) <= 80
def _chapter_block_is_table(block: str) -> bool:
pipe_rows = [
ln for ln in str(block or "").splitlines() if ln.strip() and _is_pipe_markdown_table_row_line(ln)
]
return len(pipe_rows) >= 2
def _split_chapter_blocks_for_dedupe(content: str) -> list[str]:
"""将章节正文拆成段落块与 Markdown 表块,便于做重复检测。"""
lines = str(content or "").splitlines()
blocks: list[str] = []
i = 0
n = len(lines)
pending_caption: list[str] = []
def _flush_pending_caption() -> None:
nonlocal pending_caption
if pending_caption:
blocks.append("\n".join(pending_caption))
pending_caption = []
while i < n:
if not lines[i].strip():
i += 1
continue
if _is_pipe_markdown_table_row_line(lines[i]):
table_lines: list[str] = []
while i < n and lines[i].strip() and _is_pipe_markdown_table_row_line(lines[i]):
table_lines.append(lines[i])
i += 1
if table_lines:
block_lines = list(pending_caption) + table_lines
pending_caption = []
start = i - len(table_lines)
scan = start - 1
while scan >= 0 and lines[scan].strip():
if _is_pipe_markdown_table_row_line(lines[scan]):
break
if _is_likely_table_caption_line(lines[scan]):
block_lines.insert(0, lines[scan])
scan -= 1
continue
break
blocks.append("\n".join(block_lines))
continue
para_lines: list[str] = []
while i < n:
if not lines[i].strip():
i += 1
break
if _is_pipe_markdown_table_row_line(lines[i]):
break
para_lines.append(lines[i])
i += 1
if not para_lines:
continue
if len(para_lines) == 1 and _is_likely_table_caption_line(para_lines[0]):
_flush_pending_caption()
pending_caption = para_lines
continue
_flush_pending_caption()
blocks.append("\n".join(para_lines))
_flush_pending_caption()
return blocks
def _chapter_blocks_near_duplicate(a: str, b: str) -> bool:
fa = _chapter_text_block_fingerprint(a)
fb = _chapter_text_block_fingerprint(b)
if not fa or not fb:
return False
if fa == fb:
return True
short, long = (fa, fb) if len(fa) <= len(fb) else (fb, fa)
if len(short) >= _CHAPTER_DEDUPE_MIN_BLOCK_CORE_LEN and short in long:
if len(short) / max(len(long), 1) >= 0.82:
return True
if min(len(fa), len(fb)) < 80:
return False
return SequenceMatcher(None, fa, fb).ratio() >= _CHAPTER_DEDUPE_NEAR_MATCH_RATIO
def _chapter_block_duplicate_key(block: str) -> tuple[str, str]:
text = str(block or "")
if _chapter_block_is_table(text):
hdr = _extract_table_header_key(text)
fp = _markdown_table_body_fingerprint(text)
return ("table", hdr or fp)
return ("text", _chapter_text_block_fingerprint(text))
def _dedupe_long_chapter_repetition(content: str) -> tuple[str, int]:
"""
单章字数超过阈值时,对段落/表格块做去重,缓解模型整段或整表重复输出。
保留首次出现块,删除后续实质重复块。
"""
text = str(content or "")
if len(text) <= _CHAPTER_CONTENT_DEDUPE_MIN_CHARS:
return text, 0
blocks = _split_chapter_blocks_for_dedupe(text)
if len(blocks) < 2:
return text, 0
kept: list[str] = []
seen_table_hdr: set[str] = set()
seen_table_fp: set[str] = set()
seen_text_fp: set[str] = set()
kept_text_samples: list[str] = []
removed = 0
for block in blocks:
core_len = _chapter_block_core_len(block)
kind, key = _chapter_block_duplicate_key(block)
is_dup = False
if kind == "table":
hdr = _extract_table_header_key(block) if key else ""
fp = _markdown_table_body_fingerprint(block)
if hdr and hdr in seen_table_hdr:
is_dup = True
elif fp and fp in seen_table_fp:
is_dup = True
elif key and key in seen_text_fp:
is_dup = True
elif core_len >= _CHAPTER_DEDUPE_MIN_BLOCK_CORE_LEN:
for prev in kept_text_samples:
if _chapter_blocks_near_duplicate(block, prev):
is_dup = True
break
if is_dup:
removed += 1
continue
if kind == "table":
hdr = _extract_table_header_key(block)
fp = _markdown_table_body_fingerprint(block)
if hdr:
seen_table_hdr.add(hdr)
if fp:
seen_table_fp.add(fp)
elif key:
seen_text_fp.add(key)
if core_len >= _CHAPTER_DEDUPE_MIN_BLOCK_CORE_LEN:
kept_text_samples.append(block)
kept.append(block)
if removed <= 0:
return text, 0
merged = "\n\n".join(b.strip() for b in kept if b.strip())
return re.sub(r"\n{3,}", "\n\n", merged).strip(), removed
def _dedupe_structured_table_hits(hits: list[dict]) -> list[dict]:
"""
同一必需表 token 可能命中多张历史表或重复 markdown合并输出会导致章节内连续两张相同表。
按 tableId 与表体指纹去重保留表体最完整的一条表5-4 优先可研/实际/增减结构)。
"""
ranked: list[tuple[int, dict]] = []
for hit in hits:
if not isinstance(hit, dict):
continue
md = str(hit.get("markdown") or "").strip()
if not md:
continue
fp = _markdown_table_body_fingerprint(md)
if not fp:
continue
ranked.append((_score_structured_table_hit_dict(hit), hit))
ranked.sort(key=lambda x: (-x[0], str(x[1].get("tableId") or "")))
out: list[dict] = []
seen_ids: set[str] = set()
seen_fp: set[str] = set()
for _score, hit in ranked:
tid = str(hit.get("tableId") or "").strip()
md = str(hit.get("markdown") or "").strip()
fp = _markdown_table_body_fingerprint(md)
if tid and tid in seen_ids:
continue
if fp in seen_fp:
continue
if tid:
seen_ids.add(tid)
seen_fp.add(fp)
out.append(hit)
return out
def _is_likely_table1_raw_material_caption(line: str) -> bool:
s = re.sub(r"\s+", "", str(line or ""))
if not s:
return False
if "原料数量及组成对比" in s:
return True
if re.search(r"\s*1", s) and "原料" in s:
return True
return False
def _extract_table_header_key(table_block: str) -> str:
"""提取表格的表头行(第一条 pipe 行),归一化后作为同表判断依据。"""
for ln in str(table_block or "").splitlines():
s = ln.strip()
if s.startswith("|") and s.endswith("|") and not re.match(r"^\|[\s\-:|]+\|$", s):
return re.sub(r"\s+", "", s)
return ""
def _is_433_operation_analysis_table_header(header_norm: str) -> bool:
"""4.3.3 表4-2「烷基化装置运行分析」常见 Markdown 表头(全列或仅实际值列)。"""
h = str(header_norm or "")
if "序号" not in h or "项目" not in h:
return False
if "实际值" in h:
return True
if "设计值" in h and "标定值" in h:
return True
return False
def _433_op_analysis_table_has_canonical_caption(text: str, table_block_start: int) -> bool:
"""表前若干行内是否出现合同规定的表4-2 烷基化装置运行分析表题(用于保留规范副本、去掉无表题重复表)。"""
before = str(text or "")[: int(table_block_start)].rstrip()
lines = before.split("\n")
tail = "\n".join(lines[-18:])
if "烷基化装置运行分析" not in tail:
return False
n = re.sub(r"\s+", "", tail)
return bool(re.search(r"表4[--—–]2", n))
def _dedupe_433_alkylation_operation_analysis_markdown_tables(content: str) -> str:
"""
4.3.3 常见故障模型在「2) 主要装置达标评价」下先输出无表题的同结构表,
又在「3) 全厂达标评价」下重复输出带「表4-2 …烷基化装置运行分析…」表题的同一表。
对表头/表体指纹相同的重复表优先保留表前带规范表4-2 表题的一张;否则保留文档中第一张。
"""
text = str(content or "")
if not text.strip():
return text
pat = re.compile(r"(?m)(?:^\s*\|.+\|\s*\n){3,}")
matches = list(pat.finditer(text))
if len(matches) < 2:
return text
items: list[dict] = []
for m in matches:
block = m.group(0)
hdr = _extract_table_header_key(block)
if not _is_433_operation_analysis_table_header(hdr):
continue
fp = _markdown_table_body_fingerprint(block)
items.append(
{
"m": m,
"hdr": hdr,
"fp": fp,
"cap": _433_op_analysis_table_has_canonical_caption(text, m.start()),
}
)
n = len(items)
if n < 2:
return text
parent = list(range(n))
def find(x: int) -> int:
if parent[x] != x:
parent[x] = find(parent[x])
return parent[x]
def union(x: int, y: int) -> None:
rx, ry = find(x), find(y)
if rx != ry:
parent[ry] = rx
for i in range(n):
for j in range(i + 1, n):
a, b = items[i], items[j]
same_hdr = bool(a["hdr"] and a["hdr"] == b["hdr"])
same_fp = bool(a["fp"] and a["fp"] == b["fp"])
if same_hdr or same_fp:
union(i, j)
clusters: dict[int, list[int]] = {}
for i in range(n):
r = find(i)
clusters.setdefault(r, []).append(i)
remove_spans: list[tuple[int, int]] = []
for _root, idxs in clusters.items():
if len(idxs) < 2:
continue
idxs_sorted = sorted(idxs, key=lambda ii: items[ii]["m"].start())
caps = [ii for ii in idxs_sorted if items[ii]["cap"]]
keep_idx = caps[0] if caps else idxs_sorted[0]
for ii in idxs_sorted:
if ii == keep_idx:
continue
m = items[ii]["m"]
start = m.start()
prefix = text[:start].rstrip("\n")
last_nl = prefix.rfind("\n")
title_line = prefix[last_nl + 1 :] if last_nl >= 0 else prefix
tl = title_line.strip()
if "烷基化装置运行分析" in tl and re.search(
r"表4[--—–]2", re.sub(r"\s+", "", tl)
):
start = last_nl + 1 if last_nl >= 0 else 0
before = text[:start]
if before.rstrip().endswith("-->"):
comment_start = before.rstrip().rfind("<!--")
if comment_start >= 0:
nl_before_comment = before.rfind("\n", 0, comment_start)
start = nl_before_comment + 1 if nl_before_comment >= 0 else comment_start
remove_spans.append((start, m.end()))
if not remove_spans:
return text
remove_spans.sort(key=lambda x: x[0], reverse=True)
for a, b in remove_spans:
text = text[:a] + text[b:]
return re.sub(r"\n{3,}", "\n\n", text).strip()
def _nearest_table_caption_token_before(text_before: str) -> str:
"""表块前若干行内最近的「表 x-x」表号归一化如 表3-2"""
lines = [ln for ln in str(text_before or "").splitlines() if ln.strip()]
for ln in reversed(lines[-14:]):
if "|" in ln or ln.strip().startswith("<!--"):
continue
m = re.search(r"\s*\d+\s*[-]\s*\d+", ln, flags=re.IGNORECASE)
if m:
return re.sub(r"\s+", "", m.group(0))
return ""
def _table_block_has_element_source_comment(text_before: str) -> bool:
return "表格来源:要素管理" in str(text_before or "")[-800:]
def _score_table_block_for_keep(text_before: str, block: str) -> int:
score = 0
if _table_block_has_element_source_comment(text_before):
score += 100
if _is_effective_markdown_table_block(block):
score += 20
score += min(len([ln for ln in block.splitlines() if ln.strip().startswith("|")]), 30)
return score
def _span_for_duplicate_table_removal(text: str, match: re.Match[str]) -> tuple[int, int]:
"""重复表删除范围:含紧贴表题行与要素直出注释行。"""
start = match.start()
prefix = text[:start]
stripped_prefix = prefix.rstrip("\n")
last_nl = stripped_prefix.rfind("\n")
title_line = stripped_prefix[last_nl + 1 :] if last_nl >= 0 else stripped_prefix
if re.search(r"\s*\d+\s*[-]\s*\d+", title_line, flags=re.IGNORECASE):
start = last_nl + 1 if last_nl >= 0 else 0
before = text[:start]
if before.rstrip().endswith("-->"):
comment_start = before.rstrip().rfind("<!--")
if comment_start >= 0:
nl_before = before.rfind("\n", 0, comment_start)
start = nl_before + 1 if nl_before >= 0 else comment_start
end = match.end()
while end < len(text) and text[end] == "\n":
end += 1
return start, end
# 至少 3 行管道表;末行可无换行(否则要素直出表尾缺 \n 时无法参与去重)
_PIPE_MARKDOWN_TABLE_BLOCK_RE = re.compile(
r"(?m)(?:(?:^\s*\|[^\n]+\|\s*)(?:\n|$)){3,}"
)
def _iter_pipe_markdown_table_blocks(text: str) -> list[re.Match[str]]:
return list(_PIPE_MARKDOWN_TABLE_BLOCK_RE.finditer(str(text or "")))
def _table_token_in_recent_context(text_before: str, token: str, *, max_lines: int = 16) -> bool:
"""表块前若干行(或块内表题行)是否出现给定表号。"""
if not token:
return False
lines = [ln for ln in str(text_before or "").splitlines() if ln.strip()]
for ln in reversed(lines[-max_lines:]):
if "|" in ln or ln.strip().startswith("<!--"):
continue
if _table_token_matches_name(token, ln):
return True
return False
def _dedupe_key_for_markdown_table_block(
text: str, match: re.Match[str], *, block_index: int
) -> str:
"""同表去重分组键:优先归一化表号,其次表头/表体指纹。"""
before = text[: match.start()]
block = match.group(0)
cap_tok = _nearest_table_caption_token_before(before)
if cap_tok:
return _norm_table_token(cap_tok)
for ln in block.splitlines()[:4]:
if "|" in ln:
continue
m = re.search(r"\s*\d+(?:\s*[-]\s*\d+)*", ln, flags=re.IGNORECASE)
if m:
return _norm_table_token(re.sub(r"\s+", "", m.group(0)))
hdr = _extract_table_header_key(block)
if hdr:
return f"hdr:{hdr}"
fp = _markdown_table_body_fingerprint(block)
if fp:
return f"fp:{fp}"
return f"__idx_{block_index}"
def _apply_markdown_table_block_removals(text: str, remove_spans: list[tuple[int, int]]) -> str:
if not remove_spans:
return text
remove_spans.sort(key=lambda x: x[0], reverse=True)
for a, b in remove_spans:
text = text[:a] + text[b:]
return re.sub(r"\n{3,}", "\n\n", text).strip()
def _table_block_matches_required_token_spec(block: str, token: str) -> bool:
"""无表题时,用细则列名与表头匹配判定是否为该必需表(便于合并 orphan LLM 表)。"""
canon = _canonical_global_table_name_for_token(token) or ""
spec = _multi_column_global_spec_for_table(canon)
if not spec:
return False
hdr = re.sub(r"\s+", "", _extract_table_header_key(block)).lower()
hdr = re.sub(r"<br>.*", "", hdr, flags=re.IGNORECASE)
if not hdr:
return False
cols = [str(c).strip() for c in (spec[0] or []) if str(c).strip()]
if not cols:
return False
hit = sum(
1
for col in cols
if re.sub(r"\s+|<br>.*", "", col, flags=re.IGNORECASE).lower() in hdr
)
if hit < max(1, min(2, len(cols) // 2 + 1)):
return False
row_hdr = _row_header_name_for_table(canon)
if row_hdr:
row_norm = re.sub(r"\s+", "", row_hdr).lower()
if row_norm not in hdr:
return False
# 表3-3/表3-4 数据列相同,须用行表头列区分,避免 orphan 表误并。
if _table_token_matches_name(token, "表3-3") and "专业" in hdr:
return False
if _table_token_matches_name(token, "表3-4") and (
"单元名称" in hdr or ("项目" in hdr and "专业" not in hdr)
):
return False
if _table_token_matches_name(token, "表3-5") and not any(
k in hdr for k in ("变更内容", "原因")
):
return False
# 表2-5/表2-6 数据列相同;「项目」会误命中「依托项目名称」。
if _table_token_matches_name(token, "表2-5"):
if "依托项目名称" in hdr or "依托项目" in hdr:
return False
if "项目名称" not in hdr and "项目" not in hdr:
return False
if _table_token_matches_name(token, "表2-6") and "依托项目名称" not in hdr:
return False
return True
def _markdown_table_block_belongs_to_required_token(
before: str,
block: str,
token: str,
all_tokens: list[str],
) -> bool:
"""判定管道表块是否属于某必需表号3.3.4 等多表相邻时须用最近表题,勿用宽窗口误并)。"""
cap_tok = _nearest_table_caption_token_before(before)
if cap_tok:
return _table_token_matches_name(token, cap_tok)
if _table_token_in_recent_context(block, token, max_lines=6):
return True
if not _table_block_matches_required_token_spec(block, token):
return False
for ot in all_tokens:
if ot == token:
continue
if _table_block_matches_required_token_spec(block, ot):
return False
return True
def _dedupe_duplicate_tables_for_required_tokens(
content: str, required_tables: list[str]
) -> str:
"""
按合同必需表号强制去重:同节内同一「表 x-x」只保留一张优先要素直出/较完整表体)。
解决 LLM 表与要素直出表头不一致、或无前表题 orphan 表导致通用去重未合并的情况。
"""
text = str(content or "")
tokens = [
str(t or "").strip()
for t in (required_tables or [])
if str(t or "").strip()
]
if not text.strip() or not tokens:
return text
matches = _iter_pipe_markdown_table_blocks(text)
if len(matches) < 2:
return text
remove_spans: list[tuple[int, int]] = []
for token in tokens:
hits: list[tuple[re.Match[str], int]] = []
for m in matches:
before = text[: m.start()]
block = m.group(0)
belongs = _markdown_table_block_belongs_to_required_token(
before, block, token, tokens
)
if belongs:
hits.append(
(m, _score_table_block_for_keep(before, block))
)
if len(hits) < 2:
continue
hits.sort(key=lambda x: (-x[1], x[0].start()))
for dup, _score in hits[1:]:
remove_spans.append(_span_for_duplicate_table_removal(text, dup))
return _apply_markdown_table_block_removals(text, remove_spans)
def _finalize_section_table_dedupe(content: str, required_tables: list[str] | None) -> str:
"""章节表去重终态:通用表号/表头合并 + 合同必需表号强制合并。"""
text = _dedupe_duplicate_captioned_markdown_tables(content)
if required_tables:
text = _dedupe_duplicate_tables_for_required_tokens(text, required_tables)
return text
def _dedupe_duplicate_captioned_markdown_tables(content: str) -> str:
"""
同节内重复输出「表题 + 表体」(如 3.3.2 表3-2 出现两次LLM 表 + 要素直出表)。
按表号或表头指纹分组,保留要素直出或较完整的一张。
"""
text = str(content or "")
if not text.strip():
return text
matches = _iter_pipe_markdown_table_blocks(text)
if len(matches) < 2:
return text
groups: dict[str, list[dict]] = {}
for idx, m in enumerate(matches):
before = text[: m.start()]
block = m.group(0)
key = _dedupe_key_for_markdown_table_block(text, m, block_index=idx)
groups.setdefault(key, []).append(
{
"idx": idx,
"m": m,
"score": _score_table_block_for_keep(before, block),
}
)
remove_spans: list[tuple[int, int]] = []
for _key, items in groups.items():
if len(items) < 2:
continue
items.sort(key=lambda x: (-x["score"], x["idx"]))
for dup in items[1:]:
remove_spans.append(_span_for_duplicate_table_removal(text, dup["m"]))
return _apply_markdown_table_block_removals(text, remove_spans)
def _dedupe_211_duplicate_markdown_tables(content: str) -> str:
"""
2.1.1 常见故障:模型在段落间重复输出同一张「原料数量及组成」表,或要素直出与模型表并存。
判断规则:表头行归一化后一致即视为同表(兼容 LLM 表与要素管理表数据略不同的情况)。
若表头也不同,则回退到全表体指纹一致判断。
保留每组重复中的第一张,删除后续副本(可含紧贴的表题行)。
"""
text = str(content or "")
if not text.strip():
return text
pat = re.compile(r"(?m)(?:^\s*\|.+\|\s*\n){3,}")
matches = list(pat.finditer(text))
if len(matches) < 2:
return text
headers: list[str] = []
fingerprints: list[str] = []
for m in matches:
headers.append(_extract_table_header_key(m.group(0)))
fingerprints.append(_markdown_table_body_fingerprint(m.group(0)))
seen_headers: dict[str, int] = {}
seen_fps: dict[str, int] = {}
remove_spans: list[tuple[int, int]] = []
for idx, m in enumerate(matches):
hdr = headers[idx]
fp = fingerprints[idx]
is_dup = False
if hdr and hdr in seen_headers:
is_dup = True
elif fp and fp in seen_fps:
is_dup = True
if not is_dup:
if hdr:
seen_headers[hdr] = idx
if fp:
seen_fps[fp] = idx
continue
start = m.start()
prefix = text[:start]
stripped_prefix = prefix.rstrip("\n")
last_nl = stripped_prefix.rfind("\n")
title_line = stripped_prefix[last_nl + 1 :] if last_nl >= 0 else stripped_prefix
if _is_likely_table1_raw_material_caption(title_line):
start = last_nl + 1 if last_nl >= 0 else 0
# 同时去掉紧接在表前的 HTML 注释行(要素管理标记)
before = text[:start]
if before.rstrip().endswith("-->"):
comment_start = before.rstrip().rfind("<!--")
if comment_start >= 0:
nl_before_comment = before.rfind("\n", 0, comment_start)
start = nl_before_comment + 1 if nl_before_comment >= 0 else comment_start
remove_spans.append((start, m.end()))
if not remove_spans:
return text
remove_spans.sort(key=lambda x: x[0], reverse=True)
for a, b in remove_spans:
text = text[:a] + text[b:]
return re.sub(r"\n{3,}", "\n\n", text).strip()
def _strip_minimal_missing_table_tail(content: str) -> str:
"""
移除 _append_minimal_missing_tables 产生的占位表残留:整段或仅余「| 关键数据 | 待补充 |」一行。
兼容全角竖线「|」。
"""
t = str(content or "").replace("", "|")
if not t.strip():
return t
changed = True
while changed:
changed = False
old = t
# 完整三行占位表(可选上一行 ### 表题)
t = re.sub(
r"\n(?:#{1,6}\s*[^\n]+\n\n)?\|\s*项目\s*\|\s*内容\s*\|\s*\n\|\s*---\s*\|\s*---\s*\|\s*\n\|\s*关键数据\s*\|\s*待补充\s*\|\s*$",
"",
t,
flags=re.IGNORECASE,
)
# 仅余数据行(表头已被其它步骤剥掉时)
t = re.sub(
r"\n\|\s*关键数据\s*\|\s*待补充\s*\|\s*$",
"",
t,
flags=re.IGNORECASE,
)
if t != old:
changed = True
return t.rstrip()
def _strip_orphan_markdown_table_rows(content: str) -> str:
"""
删除模型偶尔在整张 Markdown 表格之后又多输出的一行「| ... |」(常带残缺引用残留),避免表外出现残缺表行。
"""
lines = content.split("\n")
out: list[str] = []
i = 0
n = len(lines)
while i < n:
line = lines[i]
if not (_is_pipe_markdown_table_row_line(line) or _is_pipe_markdown_table_separator_line(line)):
out.append(line)
i += 1
continue
tbl: list[str] = []
while i < n and (
_is_pipe_markdown_table_row_line(lines[i])
or _is_pipe_markdown_table_separator_line(lines[i])
):
tbl.append(lines[i])
i += 1
out.extend(tbl)
blank_run: list[str] = []
while i < n and not lines[i].strip():
blank_run.append(lines[i])
i += 1
if (
i < n
and _is_pipe_markdown_table_row_line(lines[i])
and not _is_pipe_markdown_table_separator_line(lines[i])
):
k2 = i + 1
while k2 < n and not lines[k2].strip():
k2 += 1
starts_new_table = k2 < n and _is_pipe_markdown_table_separator_line(lines[k2])
if not starts_new_table:
i += 1
out.extend(blank_run)
continue
out.extend(blank_run)
return "\n".join(out).strip()
_TABLE_NO_TOKEN = r"\s*\d+\s*[\--—]\s*\d+"
def _strip_331_table_crossrefs(content: str) -> str:
"""3.3.1 为纯文字符合性评价,移除误插入的表号与“详见表…”类交叉引用。"""
s = str(content or "")
if not s:
return s
chain = rf"{_TABLE_NO_TOKEN}(?:\s*[、,]\s*{_TABLE_NO_TOKEN})*"
comma_lead = r"[,;、]"
s = re.sub(rf"[(]\s*详见\s*{chain}\s*[)]", "", s)
s = re.sub(rf"[(]\s*参见\s*{chain}\s*[)]", "", s)
# 「,详见…」整块删除易导致前后句粘连,替换为句号断句。
s = re.sub(rf"([\u4e00-\u9fff]){comma_lead}\s*详见\s*{chain}\s*[。.]*", r"\1。", s)
s = re.sub(rf"{comma_lead}\s*详见\s*{chain}\s*[。.]*", "", s)
s = re.sub(rf"([\u4e00-\u9fff]){comma_lead}\s*参见\s*{chain}\s*[。.]*", r"\1。", s)
s = re.sub(rf"{comma_lead}\s*参见\s*{chain}\s*[。.]*", "", s)
s = re.sub(
rf"{comma_lead}\s*见\s*{chain}(?:\s*[~\-至到]\s*{chain})?(?:\s*所示)?\s*[。.]*",
"",
s,
)
s = re.sub(rf"{comma_lead}\s*如\s*{chain}\s*所示\s*[。.]*", "", s)
s = re.sub(
rf"([\u4e00-\u9fff])(?:详见|参见)\s*{chain}\s*[。.]?",
r"\1。",
s,
)
s = re.sub(
rf"(?:^|(?<=[。\n]))[\s\u3000]*详见\s*{chain}\s*[。.]+",
"",
s,
flags=re.MULTILINE,
)
s = re.sub(r"[。.]{2,}", "", s)
s = re.sub(r"[,]{2,}", "", s)
return re.sub(r"\n{3,}", "\n\n", s).strip()
def _strip_333_trailing_table_caption_lines(content: str) -> str:
"""3.3.3 不应以其他小节的表题列表结尾,删除段末误粘贴的表题行。"""
lines = content.splitlines()
cap = re.compile(
r"^[\s#]*表\s*(?:3\s*[\--—]\s*[234]|2\s*[\--—]\s*7)\b.*$",
flags=re.IGNORECASE,
)
while lines:
cur = lines[-1]
if not cur.strip():
lines.pop()
continue
if cap.match(cur.strip()):
lines.pop()
continue
break
return "\n".join(lines).strip()
def _strip_341_table_artifacts(content: str) -> str:
"""3.4.1 为施工准备文字评价,移除误插入的表格交叉引用与段末表题行。"""
s = str(content or "")
if not s:
return s
chain = rf"{_TABLE_NO_TOKEN}(?:\s*[、,]\s*{_TABLE_NO_TOKEN})*"
comma_lead = r"[,;、]"
s = re.sub(rf"[(]\s*(?:详见|参见)\s*{chain}\s*[)]", "", s)
s = re.sub(rf"([\u4e00-\u9fff]){comma_lead}\s*详见\s*{chain}\s*[。.]*", r"\1。", s)
s = re.sub(rf"{comma_lead}\s*详见\s*{chain}\s*[。.]*", "", s)
s = re.sub(rf"([\u4e00-\u9fff]){comma_lead}\s*参见\s*{chain}\s*[。.]*", r"\1。", s)
s = re.sub(rf"{comma_lead}\s*参见\s*{chain}\s*[。.]*", "", s)
s = re.sub(
rf"{comma_lead}\s*见\s*{chain}(?:\s*[~\-至到]\s*{chain})?(?:\s*所示)?\s*[。.]*",
"",
s,
)
s = re.sub(rf"{comma_lead}\s*如\s*{chain}\s*所示\s*[。.]*", "", s)
cap = re.compile(
r"^[\s#]*表\s*(?:3\s*[\--—]\s*[234]|2\s*[\--—]\s*7)\b.*$",
flags=re.IGNORECASE,
)
lines = s.splitlines()
while lines:
cur = lines[-1]
if not cur.strip():
lines.pop()
continue
if cap.match(cur.strip()):
lines.pop()
continue
break
s = "\n".join(lines)
s = re.sub(r"[。.]{2,}", "", s)
s = re.sub(r"[,]{2,}", "", s)
return re.sub(r"\n{3,}", "\n\n", s).strip()
def _strip_unallowed_table_references(
content: str,
*,
allowed_table_tokens: Optional[list[str]] = None,
) -> str:
"""
通用串表清洗:
- 删除不在本节允许名单内的「详见/参见/见/如表…」引用;
- 删除仅由不允许表号开头的独立表题行(模型误贴常见形态)。
"""
text = str(content or "")
if not text:
return text
allowed = {_norm_table_token(t) for t in (allowed_table_tokens or []) if str(t).strip()}
def _is_allowed(tok: str) -> bool:
nt = _norm_table_token(tok)
return bool(nt) and nt in allowed
ref_pat = re.compile(
r"(?P<prefix>详见|参见|见|如)\s*"
r"(?P<tok>(?:附表|表)\s*\d+(?:\s*[.\--—–]\s*\d+)*)"
r"(?P<tail>(?:\s*[~\-至到]\s*(?:附表|表)\s*\d+(?:\s*[.\--—–]\s*\d+)*)?(?:\s*所示)?)"
)
title_line_start_pat = re.compile(r"^[#>*\-\d\.\)\s]*(?:附表|表)\s*\d+", flags=re.IGNORECASE)
title_line_token_pat = re.compile(r"(?:附表|表)\s*\d+(?:\s*[.\--—–]\s*\d+)*")
out: list[str] = []
for raw in text.splitlines():
s = str(raw or "")
if title_line_start_pat.match(s.strip()):
toks = title_line_token_pat.findall(s)
if toks and all(not _is_allowed(t) for t in toks):
continue
def _repl(m: re.Match) -> str:
tok = str(m.group("tok") or "")
return m.group(0) if _is_allowed(tok) else ""
s = ref_pat.sub(_repl, s)
s = re.sub(r"[,;、]\s*$", "", s)
s = re.sub(r"\s{2,}", " ", s).rstrip()
out.append(s)
merged = "\n".join(out)
merged = re.sub(r"[。.]{2,}", "", merged)
merged = re.sub(r"[,]{2,}", "", merged)
return re.sub(r"\n{3,}", "\n\n", merged).strip()
_RE_TABLE_261 = re.compile(
r"(?:^|\n)\s*[#*]*\s*\**\s*表\s*2[.\s]*6\s*[--—–.]\s*1\s*\**[^\n]*\n"
r"(?:\s*\n)*"
r"(?:(?:\s*\|[^\n]+\|\s*\n)+)?",
)
def _strip_211_stray_table_261(content: str) -> str:
"""移除 2.1.1 中残留的「表2.6-1」标题行及其紧跟的小表如仅含项目/内容两列的空壳表)。"""
if not content or "2" not in content:
return content
return re.sub(r"\n{3,}", "\n\n", _RE_TABLE_261.sub("", content)).strip()
def _table54_caption_in_preceding_lines(text_before: str, *, max_lines: int = 10) -> bool:
"""表块前是否已有「表5-4 …生产经营及效益…」表题。"""
lines = [ln for ln in str(text_before or "").splitlines() if ln.strip()]
tail = "\n".join(lines[-max_lines:])
compact = re.sub(r"\s+", "", tail)
if not re.search(r"\s*5\s*[-]\s*4", compact, flags=re.I):
return False
return "生产经营" in tail or "效益情况对比" in tail
def _is_531_spurious_simple_benefit_table(block: str, text_before: str) -> bool:
"""
5.3.1 模型常在表5-4 表题下自造三行简表,或正文后附「可研值|后评价值」样例表。
要素直出完整表5-4可研/实际/增减)不在此列。
"""
if _table54_body_preceded_by_element_source(text_before):
return False
if _is_table54_simplified_extract_body(block):
return True
hdr = _extract_table_header_key(block)
if not hdr:
return False
if "后评价值" in hdr or "后评价报告" in hdr:
return True
if "指标" in hdr and "可研值" in hdr and "项目" not in hdr and "不确定因素" not in hdr:
return True
return False
def _first_table57_caption_pos(content: str) -> int | None:
"""5.4 正文中首张「表5-7」表题行的起始位置无则 None"""
text = str(content or "")
m = re.search(
r"(?:^|\n)([^\n]*?表\s*5\s*[-]\s*7[^\n]*)\n",
text,
flags=re.IGNORECASE,
)
if not m:
return None
pos = m.start()
if pos > 0 and text[pos] == "\n":
pos += 1
return pos
def _is_57_authoritative_table_preceding(text_before: str) -> bool:
"""判定表块是否紧跟表5-7 表题或要素直出注释。"""
tail = str(text_before or "")[-1200:]
if "表格来源:要素管理" in tail:
return True
compact = re.sub(r"\s+", "", tail[-400:])
return bool(re.search(r"\s*5\s*[-]\s*7", compact, flags=re.IGNORECASE))
def _is_54_spurious_table(block: str, text_before: str, *, before_table57: bool) -> bool:
"""
5.4 仅允许表5-7。模型常自造「指标|可研值|后评价值|差值」简表多来自第5章样例
或把表5-7 放在正文段之后重复输出。
"""
if before_table57:
return True
if _is_57_authoritative_table_preceding(text_before):
return False
hdr = _extract_table_header_key(block)
if not hdr:
return True
if "后评价值" in hdr or "后评价报告" in hdr:
return True
if "可研值" in hdr and ("差值" in hdr or "增减" in hdr):
return True
if "指标" in hdr and "可研值" in hdr and "项目" not in hdr and "不确定因素" not in hdr:
return True
# 表5-7 合法表头通常含 项目/不确定因素 + 数值/临界点,不含可研/后评分列
if ("项目" in hdr or "不确定因素" in hdr) and (
"数值" in hdr or "临界点" in hdr or "临界值" in hdr
):
return False
if "可研值" in hdr:
return True
return False
def _strip_54_spurious_llm_table(content: str) -> str:
"""移除 5.4 中表5-7 之前的自造表,以及非要素直出的「可研/后评」对比简表。"""
text = str(content or "")
if not text.strip():
return text
first_57 = _first_table57_caption_pos(text)
pat = re.compile(r"(?m)((?:^\s*\|.+\|\s*\n){3,})")
matches = list(pat.finditer(text))
if not matches:
return text
remove_ranges: list[tuple[int, int]] = []
for m in matches:
before_57 = first_57 is not None and m.start() < first_57
if not _is_54_spurious_table(
m.group(0), text[: m.start()], before_table57=before_57
):
continue
start = m.start()
while start > 0 and text[start - 1] == "\n":
start -= 1
end = m.end()
while end < len(text) and text[end] == "\n":
end += 1
remove_ranges.append((start, end))
if not remove_ranges:
return text
out: list[str] = []
pos = 0
for start, end in sorted(remove_ranges):
out.append(text[pos:start])
pos = end
out.append(text[pos:])
return re.sub(r"\n{3,}", "\n\n", "".join(out)).strip()
def _is_521_proper_investment_table52_block(block: str) -> bool:
"""是否为细则「投资变动情况表」表体(含工程或费用名称 + 投资估算/初设概算/竣工决算等列)。"""
hdr = re.sub(r"\s+", "", _extract_table_header_key(block))
if not hdr:
return False
if "工程或费用名称" not in hdr and "工程或费用" not in hdr:
return False
return any(k in hdr for k in ("投资估算", "初设概算", "竣工决算"))
def _strip_521_spurious_llm_table52(content: str) -> str:
"""
5.2.1移除首张「表5-2」下误用的对标简表规模/单位造价等多来自第5章样例
保留其后要素直出或列结构完整的投资变动情况表。
"""
text = str(content or "")
matches = _iter_pipe_markdown_table_blocks(text)
if len(matches) < 2:
return text
proper_idx = next(
(
i
for i, m in enumerate(matches)
if _is_521_proper_investment_table52_block(m.group(0))
),
None,
)
if proper_idx is None:
return text
remove_spans: list[tuple[int, int]] = []
for m in matches[:proper_idx]:
block = m.group(0)
if _is_521_proper_investment_table52_block(block):
continue
before = text[: m.start()]
if not _table_token_in_recent_context(before, "表5-2"):
continue
remove_spans.append(_span_for_duplicate_table_removal(text, m))
return _apply_markdown_table_block_removals(text, remove_spans)
def _strip_531_spurious_llm_table(content: str) -> str:
"""移除 5.3.1 中非要素直出的表5-4 简表(含表题下误放的 LLM/抽取三行表)。"""
text = str(content or "")
if not text.strip():
return text
pat = re.compile(r"(?m)(?:^\s*\|.+\|\s*\n){3,}")
matches = list(pat.finditer(text))
if not matches:
return text
remove_ranges: list[tuple[int, int]] = []
for m in matches:
block = m.group(0)
if not _is_531_spurious_simple_benefit_table(block, text[: m.start()]):
continue
start = m.start()
while start > 0 and text[start - 1] == "\n":
start -= 1
end = m.end()
while end < len(text) and text[end] == "\n":
end += 1
remove_ranges.append((start, end))
if not remove_ranges:
return text
out: list[str] = []
pos = 0
for start, end in sorted(remove_ranges):
out.append(text[pos:start])
pos = end
out.append(text[pos:])
return re.sub(r"\n{3,}", "\n\n", "".join(out)).strip()
def _strip_532_embedded_appendix8_table(content: str) -> str:
"""5.3.2 勿内嵌「附表8 可研报告和后评价参数对比表」(含 ### 换行表题、表题拆行等写法)。"""
text = str(content or "")
if not text:
return text
kw8 = r"附表\s*8"
kwname = r"可研报告和后评价参数对比表"
# 表题与表体在同一行或同一物理段内
md1 = re.compile(
rf"(?:^|\n)[^\n]*{kw8}[^\n]*{kwname}[^\n]*\n"
r"(?:\s*\n|<!--[\s\S]*?-->\s*\n)*"
r"(?:\s*\|[^\n]+\|\s*\n)+",
flags=re.IGNORECASE,
)
text = md1.sub("\n", text)
# 「###」独占行后再起表题(与 DOCX 导出兼容)
md2 = re.compile(
rf"(?:^|\n)(?:\s*#{{1,6}}\s*\n)+(?:\s*\n)*"
rf"(?:[^\n]*{kw8}[^\n]*\n(?:\s*[^\n]*{kwname}[^\n]*\n)?)"
r"(?:\s*\n|<!--[\s\S]*?-->\s*\n)*"
r"(?:\s*\|[^\n]+\|\s*\n)+",
flags=re.IGNORECASE,
)
text = md2.sub("\n", text)
html_pat = re.compile(
rf"(?:^|\n)[^\n]*{kw8}[^\n]*{kwname}[^\n]*\n"
r"(?:\s*\n|<!--[\s\S]*?-->\s*\n)*"
r"\s*<table>[\s\S]*?</table>",
flags=re.IGNORECASE,
)
text = html_pat.sub("\n", text)
title_only = re.compile(
rf"(?:^|\n)(?:\s*#{{1,6}}\s*\n)+(?:\s*\n)*[^\n]*{kw8}[^\n]*(?:{kwname})?[^\n]*(?=\n|$)",
flags=re.IGNORECASE,
)
text = title_only.sub("\n", text)
title_only2 = re.compile(
rf"(?:^|\n)\s*[#>*\-\d\.\)\s]*[^\n]*{kw8}[^\n]*{kwname}[^\n]*(?=\n|$)",
flags=re.IGNORECASE,
)
text = title_only2.sub("\n", text)
return re.sub(r"\n{3,}", "\n\n", text).strip()
def _pipe_markdown_row_cells(line: str) -> list[str]:
raw = str(line or "").rstrip("\n")
s = raw.strip()
if not s.startswith("|") or not s.endswith("|"):
return []
inner = s[1:-1]
return [p.strip() for p in inner.split("|")]
def _strip_md_cell_noise(s: str) -> str:
t = re.sub(r"\*+", "", str(s or ""))
t = re.sub(r"<br\s*/?>", "", t, flags=re.I)
return t.strip()
def _strip_532_table55_bad_markdown_columns(content: str) -> str:
"""去掉正文中「表5-5 主要生产经营指标」Markdown 表的多余列(如「后评价-时点点后预测值」及冗余裸预测列)。"""
text = str(content or "")
if not text or "主要生产经营指标" not in text:
return text
fc = "后评价时点后预测值"
def _bad_header_indices(header_cells: list[str]) -> set[int]:
bad: set[int] = set()
comp_cells = [_compact_zh_ident(_strip_md_cell_noise(h)) for h in header_cells]
has_slot = False
for i, h in enumerate(header_cells):
hs = str(h or "")
parts = _split_group_year_col_key(hs)
if parts and parts[0] == fc:
tail = parts[1].strip()
if _appendix_norm_year_tail(tail) or (
_BARE_FOUR_DIGIT_YEAR_COL.fullmatch(tail) and 1900 <= int(tail) <= 2100
):
has_slot = True
break
if re.search(r"后评价时点后预测值\s*[--—–]\s*\d{4}", hs):
has_slot = True
break
fc_c = _compact_zh_ident(fc)
for i, c in enumerate(comp_cells):
if "时点点后" in c:
bad.add(i)
if has_slot and c == fc_c:
bad.add(i)
return bad
def _drop_cols_from_pipe_block(block: str, drop_idx: set[int]) -> str:
if not drop_idx:
return block
out_lines: list[str] = []
for ln in block.splitlines():
if not ln.strip().startswith("|"):
out_lines.append(ln)
continue
cells = _pipe_markdown_row_cells(ln)
if not cells:
out_lines.append(ln)
continue
new_cells = [c for j, c in enumerate(cells) if j not in drop_idx]
if len(new_cells) == len(cells):
out_lines.append(ln)
else:
out_lines.append("| " + " | ".join(new_cells) + " |")
return "\n".join(out_lines)
rx = re.compile(
r"((?:^|\n)[^\n]*表\s*5\s*[-\..·]\s*5[^\n]*主要生产经营指标[^\n]*\n)"
r"(?:\s*\n|<!--[^\n]*-->\s*\n)*"
r"((?:^[ \t]*\|[^\n]+\|\s*\n)+)",
flags=re.MULTILINE | re.IGNORECASE,
)
def _repl(m: re.Match) -> str:
prefix, body = m.group(1), m.group(2)
tbl_lines = [
ln
for ln in body.splitlines()
if ln.strip().startswith("|") and ln.strip().endswith("|")
]
if len(tbl_lines) < 2:
return m.group(0)
hdr = _pipe_markdown_row_cells(tbl_lines[0])
if not hdr:
return m.group(0)
drop = _bad_header_indices(hdr)
if not drop:
return m.group(0)
return prefix + _drop_cols_from_pipe_block(body, drop)
return rx.sub(_repl, text)
def _cleanup_section_table_artifacts(
section_title: str,
content: str,
*,
allowed_table_tokens: Optional[list[str]] = None,
) -> str:
section_no = _extract_section_number(str(section_title or ""))
text = _strip_unallowed_table_references(
str(content or ""),
allowed_table_tokens=allowed_table_tokens,
)
# 末尾兜底:防止中间步骤再次引入 4.3.2/4.3.3 串表。
if section_no in {"4.3.2", "4.3.3"}:
text = _remove_cross_section_table_pollution(section_title, text)
if section_no == "4.3.3":
text = _dedupe_433_alkylation_operation_analysis_markdown_tables(text)
if section_no == "5.3.2":
text = _strip_532_embedded_appendix8_table(text)
text = _strip_532_table55_bad_markdown_columns(text)
if section_no == "3.3.1":
return _strip_331_table_crossrefs(text)
if section_no in {"2.1.5", "3.3.2", "3.3.4", "5.1", "5.3.1", "5.3.2"}:
text = _finalize_section_table_dedupe(text, allowed_table_tokens)
if section_no == "3.3.3":
return _strip_333_trailing_table_caption_lines(text)
if section_no == "3.4.1":
return _strip_341_table_artifacts(text)
if section_no == "2.1.1":
text = _strip_211_stray_table_261(text)
text = _dedupe_211_duplicate_markdown_tables(text)
if section_no == "5.2.1":
text = _fix_521_table52_wrong_caption(text)
text = _strip_521_spurious_llm_table52(text)
text = _finalize_section_table_dedupe(text, allowed_table_tokens)
if section_no == "5.3.1":
text = _strip_531_spurious_llm_table(text)
if section_no == "5.4":
text = _strip_54_spurious_llm_table(text)
return text
def _chapter5_opening_heading_present(text: str) -> bool:
"""判断正文块是否已以第5章章题开头「5 投资…」与第1章「1 项目概况」同体例,便于前端提升为 ##)。"""
t = str(text or "").strip()
if not t:
return False
first = t.split("\n", 1)[0].strip()
if first.startswith("#"):
first = first.lstrip("#").strip()
if "第5章" in first and "投资与经济效益评价" in first:
return True
return bool(re.match(r"^5\s+投资与经济效益评价", first))
def _canonicalize_chapter5_shell_heading_line(text: str) -> str:
"""
将独立行的「第5章 投资与经济效益评价」规范为「5 投资与经济效益评价」,
与模板第1章及 promoteNumberedHeadingLinesToMarkdown## 章级)一致。
"""
lines = str(text or "").split("\n")
out: list[str] = []
replaced = False
for line in lines:
if not replaced and line.strip():
stripped = line.strip().lstrip("#").strip()
if stripped == "第5章 投资与经济效益评价" or (
stripped.startswith("第5章") and "投资与经济效益评价" in stripped
):
out.append("5 投资与经济效益评价")
replaced = True
continue
out.append(line)
return "\n".join(out)
def _section_heading_present_in_text(text: str, heading_title: str) -> bool:
"""判断正文中是否已出现指定节标题行。"""
if not str(text or "").strip() or not str(heading_title or "").strip():
return False
target_norm = _title_compare_norm(heading_title)
section_no = _extract_section_number(heading_title)
for line in str(text).splitlines():
stripped = line.strip()
if not stripped:
continue
plain = stripped.lstrip("#").strip()
if _title_compare_norm(plain) == target_norm:
return True
if section_no and _is_heading_line_for_section(plain, section_no):
return True
if section_no == "5" and _chapter5_opening_heading_present(text):
return True
return False
def _inject_missing_parent_section_headings(
section_title: str,
content: str,
previous_section_content: str,
chapter_title_map: dict[str, str],
) -> str:
"""
仅生成叶子节时,父节壳(如 5.2、5.3、5不会单独落库在首个子节x.y.1)前补足父节标题。
"""
if not chapter_title_map:
return content
stub = SimpleNamespace(section_title=str(section_title or "").strip())
ancestors = _resolve_ancestor_titles_for_section(stub, chapter_title_map)
if not ancestors:
return content
body = str(content or "").strip()
if not body:
return content
if _extract_section_number(section_title or "") == "5.1":
body = _canonicalize_chapter5_shell_heading_line(body)
prior = str(previous_section_content or "")
missing: list[str] = []
for anc in ancestors:
if _section_heading_present_in_text(body, anc):
continue
if _section_heading_present_in_text(prior, anc):
continue
missing.append(anc)
if not missing:
return body
return "\n\n".join(missing + [body]).strip()
def _inject_chapter5_title_before_section_51(
section_key: str,
content: str,
previous_section_content: str,
*,
section_title: str = "",
chapter_title_map: Optional[dict[str, str]] = None,
) -> str:
"""兼容旧调用;优先走通用父节标题注入。"""
if chapter_title_map and section_title:
return _inject_missing_parent_section_headings(
section_title, content, previous_section_content, chapter_title_map
)
if str(section_key or "").strip() != "5-1":
return content
body = _canonicalize_chapter5_shell_heading_line(str(content or "").strip())
if not body:
return content
if _chapter5_opening_heading_present(body):
return body
if _chapter5_opening_heading_present(previous_section_content):
return body
return f"5 投资与经济效益评价\n\n{body}"
def _previous_completed_section_content(
section: ReportTemplateSection,
sections: list[ReportTemplateSection],
completed_contents: dict[str, str],
) -> str:
"""按模板顺序取当前节之前最近一节已生成正文(用于父节标题是否已出现)。"""
ordered = list(sections or [])
try:
idx = next(i for i, s in enumerate(ordered) if s.section_key == section.section_key)
except StopIteration:
return ""
for j in range(idx - 1, -1, -1):
body = str(completed_contents.get(ordered[j].section_key) or "").strip()
if body:
return body
return ""
def _prev_line_invites_metric_continuation(prev_line: str) -> bool:
"""上一行是否像在句中被截断、下一行应以能耗/物耗数值续写。"""
s = str(prev_line or "").strip()
if not s:
return False
if s.startswith("|"):
return False
if re.match(r"^\s{0,3}#{0,6}\s*\d+(?:\.\d+)+\s+[\u4e00-\u9fff]", s):
return False
if re.search(r"(?:\[\d+\]\s*)+$", s):
return True
if re.search(
r"(?:单耗|电耗|能耗|水耗|物耗|损失|运行值|设计值|加工量|负荷|占比)为?"
r"\s*(?:\[\d+\]\s*)*$",
s,
):
return True
if re.search(
r"(上升至|升至|降至|下降为|提高为|降低为|为|达到|至)\s*(?:\[\d+\]\s*)*$",
s,
):
return True
return bool(re.search(r"[至为是到]$", s))
def _merge_orphan_energy_metric_lines(text: str) -> str:
"""
将误断成独立行的能耗/物耗数值片段并回上一行正文。
例:「…可研报告 [50]\\n132.41 MJ/t产品及初步设计」→ 合并为一行,避免前端误判为 ### 标题。
"""
from services.docx_export_service import _is_likely_section_number
metric_re = re.compile(
r"^\s*(?:#{1,6}\s+)?(\d+(?:\.\d+)?)\s+(MJ/t|kWh/t|kgce/t|t产品)",
re.IGNORECASE,
)
lines = str(text or "").split("\n")
out: list[str] = []
for line in lines:
stripped = re.sub(r"^#{1,6}\s+", "", str(line or "").strip())
m = metric_re.match(stripped)
if m and not _is_likely_section_number(m.group(1)):
prev = len(out) - 1
while prev >= 0 and not str(out[prev] or "").strip():
prev -= 1
if prev >= 0 and _prev_line_invites_metric_continuation(str(out[prev] or "")):
out[prev] = out[prev].rstrip() + stripped
continue
out.append(line)
return "\n".join(out)
def _fix_numeric_line_breaks(content: str) -> str:
"""
修复数字与单位/日期在换行处被意外拆分的问题。
仅合并明显数字语义连续场景,尽量不影响正常段落换行。
"""
text = str(content or "")
if not text:
return text
# 统一各种换行分隔符,避免 \u2028/\u2029 导致规则失效
text = text.replace("\r\n", "\n").replace("\r", "\n")
text = text.replace("\u2028", "\n").replace("\u2029", "\n")
# 保护 Markdown 表格行之间及表格行与后续正文之间的换行,
# 否则数字合并规则会把表格末行和下一行粘在一起变成多余列。
# 策略:按行拆分,识别所有表格行(以 | 开头或以 | 结尾),
# 将其前后换行替换为保护占位符,合并规则处理完毕后再恢复。
table_nl_token = "\u0000TABLE_NL\u0000"
_lines = text.split("\n")
for _li in range(len(_lines)):
_stripped = _lines[_li].strip()
_is_table = _stripped.startswith("|") or _stripped.endswith("|")
if _is_table:
_lines[_li] = table_nl_token + _lines[_li] + table_nl_token
text = "\n".join(_lines)
text = text.replace(table_nl_token + "\n" + table_nl_token, table_nl_token)
text = text.replace("\n" + table_nl_token, table_nl_token)
text = text.replace(table_nl_token + "\n", table_nl_token)
# 先保护“章节标题换行”1 项目概况\n1.1 项目基本情况),避免被数字合并规则误伤。
heading_nl_token = "\u0000HEADING_NL\u0000"
text = re.sub(
r"\n(?=\s*\d+(?:\.\d+)*\s+[\u4e00-\u9fff]{2,}(?:\s|$))",
heading_nl_token,
text,
)
# 数字/中文与下一行之间的合并:换行两侧仅允许水平空白(不含 \\n
# 否则 \\s* 会吞掉段落空行的第一个 \\n使 \\n(?!\\n) 失效,误把「标题\\n\\n2017年…」粘回一行。
_hsp = r"[ \t\u3000]*"
# 例2018 年 11 月\n4 日、24.48\n%、1906\nm2、0.05\ng
text = re.sub(
rf"(?<=\d){_hsp}\n(?!\n){_hsp}(?=(?:\d|[年月日时分秒度%%℃吨米台套项]|[A-Za-z]))",
"",
text,
)
# 例:烈度\n7 度、规模\n15 万吨/年(中文描述后接数字)
# 仅在“下一行是数字 + 常见单位/量纲”时合并,避免误伤编号列表(如 1/1.
text = re.sub(
rf"(?<=[\u4e00-\u9fff]){_hsp}\n(?!\n){_hsp}(?=\d+(?:\.\d+)?\s*(?![)、.])(?:[年月日时分秒度%℃吨米台套项个级亩万亿千百十gGlLmMkKvVaAwWhHzHPp]|[A-Za-z]{{1,4}}\b))",
"",
text,
)
# 例106万\n工时、15万吨/年\n烷基化项目数量级后接中文语义单位
text = re.sub(
rf"(?<=[\d万亿千百十]){_hsp}\n(?!\n){_hsp}(?=(?:工时|吨/年|万吨/年|亿元|万元|万人|m2|m3|m²|m³|项|台|套|个|座|处|条|次|年|月|日))",
"",
text,
flags=re.IGNORECASE,
)
# 例kgEo/\nt、m\n2 等单位被拆分
text = re.sub(r"(?<=[A-Za-z/])\s*\n\s*(?=\d)", "", text)
text = re.sub(r"(?<=[A-Za-z])\s*\n\s*(?=[A-Za-z])", "", text)
# 例:实际运行值为\n137.88 MJ/t…单耗为 [93][94]\n\n1.38 MJ/t2.1.1/2.1.6 常见断行)
_metric_num = r"\d+(?:\.\d+)?\s*(?:MJ/t|kWh/t|kgce/t|t产品)"
text = re.sub(
rf"((?:\[\d+\]\s*)+)\s*\n+\s*({_metric_num})",
r"\1 \2",
text,
flags=re.IGNORECASE,
)
text = re.sub(
rf"(?<=[\u4e00-\u9fff\]])\s*\n+\s*({_metric_num})",
r" \1",
text,
flags=re.IGNORECASE,
)
text = _merge_orphan_energy_metric_lines(text)
# 统一面积/体积单位写法m2/m3 -> m²/m³兼容空格、大小写、^ 写法)
text = re.sub(r"(?i)\bm\s*(?:\^?\s*2)\b", "", text)
text = re.sub(r"(?i)\bm\s*(?:\^?\s*3)\b", "", text)
text = text.replace(heading_nl_token, "\n")
text = text.replace(table_nl_token, "\n")
return text
def _canonical_global_table_name_for_token(token: str) -> str | None:
t = str(token or "").strip()
if not t:
return None
for name in MULTI_COLUMN_GLOBAL_SPECS:
if _table_token_matches_name(t, name):
return name
return None
def _skeleton_markdown_for_table_token(token: str, *, table_name: str = "") -> str:
"""按细则模版生成占位 Markdown 表体要素无有效单元格时仍保证表3-4 等有表体)。"""
full_name = _canonical_global_table_name_for_token(token) or ""
if not full_name:
tn = str(table_name or "").strip()
if _multi_column_global_spec_for_table(tn):
full_name = tn
elif tn:
full_name = _canonical_global_table_name_for_token(tn) or tn
spec = _multi_column_global_spec_for_table(full_name)
if not spec:
return ""
row_order = global_table_row_keys(full_name)
if not row_order:
return ""
col_order = [str(c).strip() for c in (spec[0] or []) if str(c).strip()]
if not col_order:
return ""
md, _ = _render_markdown_table(full_name, row_order, col_order, {})
return str(md or "").strip()
def _authoritative_block_for_required_table(token: str, evidence: dict) -> str | None:
"""要素直出整块:优先 structuredTables 中的 markdown否则用模版骨架表。"""
table_rows = evidence.get("structuredTables") if isinstance(evidence, dict) else []
title = str(token or "").strip()
md = ""
if isinstance(table_rows, list):
best_row: dict | None = None
best_row_score = -1
for row in table_rows:
if not isinstance(row, dict):
continue
token_hit = str(row.get("token") or "")
table_name_hit = str(row.get("tableName") or "")
if _table_token_matches_name(token, token_hit) or _table_token_matches_name(
token, table_name_hit
):
row_md = str(row.get("markdown") or "").strip()
if not row_md:
continue
row_score = (
_score_structured_table_hit_dict(row)
if _table_token_matches_name(token, "表5-4")
else len(row_md)
)
if row_score > best_row_score:
best_row_score = row_score
best_row = row
if best_row:
md = str(best_row.get("markdown") or "").strip()
title = str(best_row.get("tableName") or token).strip() or token
if not md:
sk = _skeleton_markdown_for_table_token(token, table_name=title)
if sk:
md = sk
canon = _canonical_global_table_name_for_token(token)
if canon:
title = canon
if not md:
return None
return (
f"{title}\n\n"
"<!-- 表格来源:要素管理(结构化表直出) -->\n"
f"{md}"
)
def _fill_required_table_caption_stubs(
content: str, required_tables: list[str], evidence: dict
) -> str:
"""将仅有表题、段内无 Markdown 表体的必需表替换为要素直出或模版骨架。"""
text = str(content or "")
changed = False
for token in required_tables or []:
if not _table_token_caption_line_re(token).search(text):
continue
seg = _segment_after_table_caption(text, token)
if _segment_has_markdown_table_body(seg):
seg_tbl = re.search(r"(?m)(?:^\s*\|[^\n]+\|\s*\n){3,}", seg)
if not (
_table_token_matches_name(token, "表5-4")
and seg_tbl
and _is_table54_simplified_extract_body(seg_tbl.group(0))
):
continue
block = _authoritative_block_for_required_table(token, evidence)
if not block:
continue
text = _replace_caption_stub_with_authoritative_table(text, token, block)
changed = True
return text if changed else content
def _append_structured_missing_tables(content: str, missing_tables: list[str], evidence: dict) -> str:
out_content = str(content or "").rstrip()
used = False
for token in missing_tables:
block = _authoritative_block_for_required_table(token, evidence)
if not block:
continue
if _table_token_caption_line_re(token).search(out_content):
out_content = _replace_caption_stub_with_authoritative_table(
out_content, token, block
)
else:
out_content = out_content + "\n\n" + block
used = True
return out_content.strip() if used else content
def _replace_llm_table_with_authoritative(content: str, token: str, replacement_md: str) -> str:
"""将 LLM 自行生成的同 token 表格(表题行 + 表格体)替换为要素管理直出内容。
关键:管道行匹配使用 ``[ \\t]*`` 而非 ``\\s*``,防止 ``\\s`` 跨越空行
把分析文字中的 token 引用误关联到远处另一张表的管道行。
表题行与首条管道行之间允许至多一个空行(``\\n?``)。
"""
token_plain = re.sub(r"\s+", "", str(token or ""))
if not token_plain or not replacement_md:
return content
token_re = re.escape(token_plain).replace(r"\-", r"[--—–]")
md_table_pat = re.compile(
r"((?:^|\n)[^\n]*?" + token_re + r"[^\n]*\n)"
r"(\n?(?:[ \t]*\|[^\n]+\|[ \t]*\n)+)",
flags=re.IGNORECASE,
)
m = md_table_pat.search(content)
if m:
return content[:m.start()] + "\n" + replacement_md + "\n\n" + content[m.end():].lstrip("\n")
html_table_pat = re.compile(
r"((?:^|\n)[^\n]*?" + token_re + r"[^\n]*\n)"
r"(\s*<table>[\s\S]*?</table>)",
flags=re.IGNORECASE,
)
m = html_table_pat.search(content)
if m:
return content[:m.start()] + "\n" + replacement_md + "\n\n" + content[m.end():].lstrip("\n")
return content
def _caption_followed_by_element_table_comment(content: str, token: str) -> bool:
"""仅当「本表表题行后」紧跟要素直出注释时,才视为已权威化,避免全篇任一注释误伤其它表的替换。"""
token_plain = re.sub(r"\s+", "", str(token or ""))
if not token_plain:
return False
token_re = re.escape(token_plain).replace(r"\-", r"[--—–]")
return bool(
re.search(
r"(?:^|\n)[^\n]*?" + token_re + r"[^\n]*\n"
r"(?:[ \t]*\n)?[ \t]*<!--\s*表格来源:要素管理",
str(content or ""),
flags=re.IGNORECASE | re.MULTILINE,
)
)
def _refresh_element_table_markdown_tokens(
content: str,
evidence: dict,
tokens: tuple[str, ...],
) -> str:
"""用要素包中的结构化 Markdown 再次覆盖正文内指定表号(抵消模板格式合同 LLM 修复对表头的改写)。"""
rows = evidence.get("structuredTables") if isinstance(evidence, dict) else []
if not isinstance(rows, list) or not rows:
return content
out = str(content or "")
for token in tokens:
token_n = str(token or "").strip()
if not token_n:
continue
hit_md: str | None = None
hit_title: str | None = None
best_sc = -1
for row in rows:
if not isinstance(row, dict):
continue
th = str(row.get("token") or "")
tn = str(row.get("tableName") or "")
if _table_token_matches_name(token_n, th) or _table_token_matches_name(token_n, tn):
md = str(row.get("markdown") or "").strip()
if not md:
continue
sc = (
_score_structured_table_hit_dict(row)
if _table_token_matches_name(token_n, "表5-4")
else len(md)
)
if sc > best_sc:
best_sc = sc
hit_md = md
hit_title = str(row.get("tableName") or token_n).strip() or token_n
if not hit_md or not hit_title:
continue
rep = (
f"{hit_title}\n\n"
"<!-- 表格来源:要素管理(结构化表直出) -->\n"
f"{hit_md}"
)
if _table_token_exists(out, token_n):
out = _replace_llm_table_with_authoritative(out, token_n, rep)
return out
def _append_authoritative_required_tables(content: str, required_tables: list[str], evidence: dict) -> str:
"""
为模板必需表追加"要素表直出"块,确保表格数据直接来自结构化要素表。
若 LLM 已自行生成了同 token 的表格,用要素管理数据替换之。
"""
if not required_tables:
return content
out_content = str(content or "")
used = False
for token in required_tables:
already_authoritative = (
_caption_followed_by_element_table_comment(out_content, token)
and _table_token_exists(out_content, token)
)
if already_authoritative:
continue
combined_md = _authoritative_block_for_required_table(token, evidence)
if not combined_md:
continue
if _table_token_exists(out_content, token):
replaced = _replace_llm_table_with_authoritative(out_content, token, combined_md)
out_content = (
replaced
if replaced != out_content
else _replace_caption_stub_with_authoritative_table(
out_content, token, combined_md
)
)
elif _table_token_caption_line_re(token).search(out_content):
out_content = _replace_caption_stub_with_authoritative_table(
out_content, token, combined_md
)
else:
out_content = out_content.rstrip() + "\n\n" + combined_md
used = True
return out_content.strip() if used else content
def _is_effective_markdown_table_block(md_block: str) -> bool:
lines = [str(ln or "").strip() for ln in str(md_block or "").splitlines() if str(ln or "").strip()]
if len(lines) < 3:
return False
if _is_pipe_markdown_table_separator_line(lines[0]):
return False
if not _is_pipe_markdown_table_separator_line(lines[1]):
return False
data_rows = [
ln for ln in lines[2:]
if _is_pipe_markdown_table_row_line(ln) and not _is_pipe_markdown_table_separator_line(ln)
]
return bool(data_rows)
def _ensure_required_structured_tables_integrity(content: str, required_tables: list[str], evidence: dict) -> str:
if not required_tables:
return content
repaired = str(content or "")
for token in required_tables:
authoritative_block = _authoritative_block_for_required_table(token, evidence)
if not authoritative_block:
continue
if _table_token_caption_line_re(token).search(repaired):
seg = _segment_after_table_caption(repaired, token)
if not _segment_has_markdown_table_body(seg):
repaired = _replace_caption_stub_with_authoritative_table(
repaired, token, authoritative_block
)
continue
if _table_token_matches_name(token, "表5-4"):
m_seg = re.search(
r"(?m)(?:^\s*\|[^\n]+\|\s*\n){3,}", seg
)
if m_seg and _is_table54_simplified_extract_body(m_seg.group(0)):
repaired = _replace_caption_stub_with_authoritative_table(
repaired, token, authoritative_block
)
continue
token_plain = re.sub(r"\s+", "", str(token or ""))
token_re = re.escape(token_plain).replace(r"\-", r"[--—–]")
table_pat = re.compile(
r"((?:^|\n)[^\n]*?" + token_re + r"[^\n]*\n(?:\n|[ \t]*<!--[^\n]*-->[ \t]*\n)*)"
r"((?:[ \t]*\|[^\n]*\|[ \t]*\n)+)",
flags=re.IGNORECASE,
)
m = table_pat.search(repaired)
if m:
cur_table = str(m.group(2) or "")
need_replace = not _is_effective_markdown_table_block(cur_table)
if _table_token_matches_name(token, "表5-4") and _is_table54_simplified_extract_body(
cur_table
):
need_replace = True
if need_replace:
repaired = (
repaired[:m.start()]
+ "\n"
+ authoritative_block
+ "\n\n"
+ repaired[m.end():].lstrip("\n")
)
elif not _table_token_exists(repaired, token):
repaired = repaired.rstrip() + "\n\n" + authoritative_block
return repaired.strip()
def _collect_structured_tables(
db: Session,
project_uuid: str,
required_tables: list[str],
*,
section_title: str,
section_tokens: list[str],
) -> list[dict]:
"""
报告生成阶段的结构化表来源必须与“要素管理”一致element_tables/element_cells
规则:
- 若模板 prompt 中声明了必需表(如 表2-1/附表8优先按 token 精准匹配;
- 若未声明必需表,或声明了但匹配不到,则按章节标题/关键词从要素管理中选取最相关的表直出,
避免模型自行编造表格。
"""
def _table_relevance_score(table_name: str) -> int:
name = str(table_name or "").strip()
if not name:
return 0
name_l = name.lower()
score = 0
# 章节标题强相关加权
t = str(section_title or "").strip()
if t and t in name:
score += 10
# token 命中加分
for tok in (section_tokens or [])[:20]:
tt = str(tok or "").strip()
if not tt:
continue
if tt.lower() in name_l:
score += 2
# 常见表名关键字(表/附表/对比/评价)做轻微加权,便于优先输出真正的表
if any(k in name for k in ("", "附表", "对比", "评价", "评分")):
score += 1
return score
tables: list[ElementTable] = (
db.query(ElementTable)
.filter(ElementTable.project_id == project_uuid)
.order_by(ElementTable.sort_order.asc(), ElementTable.updated_at.desc())
.all()
)
if not tables:
return []
# 5.3.2正文仅需表5-5/表5-6附表8 归入全书「## 附表」,勿纳入本节结构化证据。
if _extract_section_number(str(section_title or "")) == "5.3.2":
tables = [
t
for t in tables
if not (
("附表8" in str(t.table_name or ""))
and ("可研报告和后评价参数对比表" in str(t.table_name or ""))
)
]
if not tables:
return []
# 1) 必需表:按 token/表名匹配(尽量“直接用要素管理中的表”)
required_norm = [_norm_table_token(t) for t in (required_tables or []) if _norm_table_token(t)]
required_hits: list[ElementTable] = []
if required_norm:
for t in tables:
name_norm = _norm_table_token(t.table_name)
if not name_norm:
continue
if any(req and _table_token_matches_name(req, name_norm, normalized=True) for req in required_norm):
required_hits.append(t)
if required_hits:
req_hint_words: list[str] = []
for req in required_norm:
req_hint_words.extend(_TABLE_TOKEN_PREFERRED_NAME_HINTS.get(req, ()))
def _required_hit_score(tb: ElementTable) -> tuple[int, int, int]:
tb_name = str(tb.table_name or "").strip()
hint_hit = 0
if req_hint_words:
for hint in req_hint_words:
if hint and hint in tb_name:
hint_hit += 1
collect = 0
if any(_table_token_matches_name(req, "表5-4") for req in required_norm):
collect = _element_table_collect_score(db, tb, "表5-4")
return (collect, hint_hit, _table_relevance_score(tb_name))
required_hits.sort(key=_required_hit_score, reverse=True)
# 保留靠前若干张,避免同 token 多张历史表导致提示词爆炸
required_hits = required_hits[:8]
# 2) 回退:未声明必需表,或声明了但没匹配上时,按相关性挑选
selected: list[ElementTable] = list(required_hits)
if not selected:
scored: list[tuple[int, ElementTable]] = []
for t in tables:
s = _table_relevance_score(t.table_name)
if s > 0:
scored.append((s, t))
scored.sort(key=lambda x: x[0], reverse=True)
selected = [t for _, t in scored[:4]]
selected_has_time54 = any(
_is_table54_operating_benefit(str(t.table_name or ""))
and str(t.table_type or "").strip() == "time"
for t in selected
)
out: list[dict] = []
for table in selected:
is_time = str(table.table_type or "").strip() == "time"
if is_time:
ty_row = db.query(ElementTable.year).filter(ElementTable.id == table.id).first()
tbl_y = (
int(ty_row[0])
if ty_row and ty_row[0] is not None and int(ty_row[0]) > 0
else None
)
year_items = _build_time_table_markdowns_by_year(db, table.id, table.table_name)
if _is_table54_operating_benefit(table.table_name):
picked = _pick_table54_year_markdown(year_items, table_year=tbl_y)
if picked:
year_items = [picked]
for display_name, md in year_items:
if not md:
continue
token = _extract_table_short_token(display_name)
out.append(
{
"tableId": table.id,
"tableName": display_name,
"token": token,
"markdown": md,
}
)
if len(out) >= 12:
break
else:
if (
selected_has_time54
and _is_table54_operating_benefit(table.table_name)
):
continue
md, common_unit = _build_structured_table_markdown(db, table.id, table.table_name)
display_name = _merge_table_title_with_common_unit(str(table.table_name or "").strip(), common_unit)
token = _extract_table_short_token(table.table_name)
if not md:
md = _skeleton_markdown_for_table_token(
token or display_name, table_name=display_name
)
if not md:
continue
hit = {
"tableId": table.id,
"tableName": display_name,
"token": token,
"markdown": md,
}
if _is_table54_operating_benefit(table.table_name) and _is_table54_simplified_extract_body(md):
continue
out.append(hit)
if len(out) >= 12:
break
t54_norm = _norm_table_token("表5-4")
t54_hits = [h for h in out if _norm_table_token(str(h.get("token") or "")) == t54_norm]
if t54_hits:
rest = [h for h in out if _norm_table_token(str(h.get("token") or "")) != t54_norm]
ranked = _dedupe_structured_table_hits(t54_hits)
out = rest + (ranked[:1] if ranked else [])
return out[:12]
def _table_2_5_general_layout_comparison_name(table_name: str) -> bool:
"""与要素管理 quick-fill 表2-5 判定一致(表头用「项目名称」,不含依托对比)。"""
n = str(table_name or "")
if "依托" in n:
return False
return "表2-5" in n or "总图、储运、公用工程及辅助工程对比" in n
def _table_2_6_reliance_comparison_name(table_name: str) -> bool:
"""与要素管理 quick-fill 表2-6判定一致行展示去「依托·」等类别前缀、表头用依托项目名称"""
n = str(table_name or "")
return (
"表2-6" in n
or "储运、公用工程及辅助工程依托对比" in n
or "辅助工程依托对比" in n
)
def _table_3_1_contracting_units_name(table_name: str) -> bool:
"""与要素管理 quick-fill 表3-1判定一致行展示去「承包单元·」前缀、表头用单元名称"""
n = str(table_name or "")
return "表3-1" in n or "项目承包单位情况" in n
def _table_3_3_plantwide_design_change_name(table_name: str) -> bool:
"""与要素管理 quick-fill 表3-3 判定一致(表头用「单元名称」)。"""
n = str(table_name or "")
return ("表3-3" in n or "施工图设计变更情况" in n) and "全厂" in n
def _table_3_4_single_unit_design_change_name(table_name: str) -> bool:
"""与要素管理 quick-fill 表3-4 判定一致(表头用「专业」)。"""
n = str(table_name or "")
return ("表3-4" in n or "施工图设计变更情况" in n) and "单装置" in n
def _table_3_5_major_design_change_name(table_name: str) -> bool:
"""与要素管理 quick-fill 表3-5判定一致行展示去「重大变更·」前缀、表头用单元名称"""
n = str(table_name or "")
return "表3-5" in n or "影响投资或工期" in n
def _table_3_7_procurement_name(table_name: str) -> bool:
"""与要素管理 quick-fill 表3-7判定一致行展示去「采购物资·」前缀、表头用物资类别名称"""
n = str(table_name or "")
return "表3-7" in n or "采购工作情况" in n
def _table_4_2_alkylation_operation_analysis_name(table_name: str) -> bool:
"""烷基化装置运行分析表含历史误标为表4-1、用户改写考核日期后的表题"""
n = re.sub(r"\s+", "", str(table_name or ""))
return ("烷基化装置运行分析" in n) and ("考核时间" in n)
_TABLE42_ANALYSIS_TEMPLATE_NAME = "表4-2 烷基化装置运行分析表(考核时间:×年×月×日)"
def _multi_column_global_spec_for_table(table_name: str):
"""按表名取多列模版表4-2 槽位改名后仍套用标准列序(单位/设计值/标定值/实际值)。"""
tn = str(table_name or "").strip()
spec = MULTI_COLUMN_GLOBAL_SPECS.get(tn)
if spec:
return spec
if _table_4_2_alkylation_operation_analysis_name(table_name):
return MULTI_COLUMN_GLOBAL_SPECS.get(_TABLE42_ANALYSIS_TEMPLATE_NAME)
return None
def _element_manage_row_label_after_first_dot(label: str) -> str:
"""与 quick-fill.js parseRowKeyForDisplay 一致:去掉行键第一个「…·」段(仅作展示)。"""
s = str(label or "").strip()
if "\u00b7" in s:
rest = "\u00b7".join(s.split("\u00b7", 1)[1:]).strip()
return rest if rest else s
return s
def _element_manage_table_row_display_label(table_name: str, label: str) -> str:
"""表2-6/表3-1/表3-5/表3-7/表4-2 等与要素管理行名展示对齐(库内 row_key 仍保留类别前缀)。"""
if (
_table_2_6_reliance_comparison_name(table_name)
or _table_3_1_contracting_units_name(table_name)
or _table_3_5_major_design_change_name(table_name)
or _table_3_7_procurement_name(table_name)
or _table_4_2_alkylation_operation_analysis_name(table_name)
):
return _element_manage_row_label_after_first_dot(label)
return str(label or "").strip()
def _row_header_name_for_table(table_name: str) -> str:
name = str(table_name or "")
if "产品方案对比表" in name:
return "产品"
if "原料数量及组成对比表" in name:
return "原料名称"
if "原料)性质对比表" in name or "原料性质对比表" in name:
return "名称"
if _table_2_5_general_layout_comparison_name(name):
return "项目名称"
if _table_2_6_reliance_comparison_name(name):
return "依托项目名称"
if _table_3_3_plantwide_design_change_name(name):
return "单元名称"
if _table_3_4_single_unit_design_change_name(name):
return "专业"
if _table_3_1_contracting_units_name(name) or _table_3_5_major_design_change_name(name):
return "单元名称"
if _table_3_7_procurement_name(name):
return "物资(类别)名称"
if _table_4_2_alkylation_operation_analysis_name(name):
return "项目名称"
if (
_table52_investment_change_name(name)
or _table53_engineering_cost_change_name(name)
or _appendix2_investment_structure_name(name)
):
return "工程或费用名称"
return "项目"
_GROUP_HEADER_PREFIXES = [
"可研报告", "可研", "初步设计", "实际生产", "实际运行", "实际实施", "后评价",
]
# 含完整时点组名,避免「后评价时点后预测值」被前缀「后评价」误拆成两行表头
_GROUP_HEADER_EXACT = {p for p in _GROUP_HEADER_PREFIXES} | {
"后评价时点前实际值",
"后评价时点后预测值",
}
def _group_column_headers(col_order: list[str]) -> tuple[list[str], list[str]] | None:
"""识别多级列头。仅当列名形如"可研报告数量(万吨)"时拆分为 top=可研报告 sub=数量(万吨)。
独立列名(如"可研报告""初步设计""实际实施")不视为分组,避免误拆。"""
top_headers: list[str] = []
sub_headers: list[str] = []
has_group = False
for col in col_order:
text = str(col or "").strip()
if not text:
top_headers.append("")
sub_headers.append("")
continue
# 列键笔误「…时点点后…」勿按「后评价」前缀拆分,否则 Markdown 展平成「后评价-时点点后…」。
if "时点点后" in text:
has_group = True
top_headers.append("")
sub_headers.append(text.replace("时点点后", "时点后", 1))
continue
# 附表37、表5-5列键为「后评价时点后预测值|2021」等必须在「后评价」前缀规则之前处理
# 否则会拆成 top=后评价、sub=时点后预测值|2021Markdown 单行表头与按 col_key 取值的列错位。
pipe_sep = "|" if "|" in text else ("\uff5c" if "\uff5c" in text else None)
if pipe_sep is not None:
group, tail = text.split(pipe_sep, 1)
group, tail = group.strip(), tail.strip()
if group in _APPENDIX_TIME_SLOT_GROUPS and tail:
has_group = True
top_headers.append(group)
sub_headers.append(tail)
continue
# 表5-4「可研报告|××年#1」等不得按「可研报告」前缀拆成「可研报告-|××年#1」
if group in _TABLE54_PIPE_METRIC_PREFIXES and tail:
has_group = True
top_headers.append("")
sub_headers.append(text)
continue
if text in _GROUP_HEADER_EXACT:
top_headers.append("")
sub_headers.append(text)
continue
matched = False
for prefix in _GROUP_HEADER_PREFIXES:
if text.startswith(prefix) and len(text) > len(prefix):
suffix = text[len(prefix):].strip()
if suffix:
has_group = True
top_headers.append(prefix)
sub_headers.append(suffix)
matched = True
break
if matched:
continue
if "·" in text:
has_group = True
left, right = [part.strip() for part in text.split("·", 1)]
top_headers.append(left)
sub_headers.append(right)
continue
top_headers.append("")
sub_headers.append(text)
return (top_headers, sub_headers) if has_group else None
def _table51_main_economic_indicators_name(table_name: str) -> bool:
n = str(table_name or "")
return "表5-1" in n and "主要经济指标对比" in n
def _table52_investment_change_name(table_name: str) -> bool:
n = str(table_name or "")
return "表5-2" in n and "投资变动情况表" in n
def _table53_engineering_cost_change_name(table_name: str) -> bool:
n = str(table_name or "")
return "表5-3" in n and "工程费用变动情况表" in n
def _appendix2_investment_structure_name(table_name: str) -> bool:
n = str(table_name or "")
return "附表2" in n and "项目竣工决算投资构成表" in n
def _appendix3_cashflow_name(table_name: str) -> bool:
n = str(table_name or "")
return "附表3" in n and "项目投资财务现金流量表" in n
def _appendix4_profit_name(table_name: str) -> bool:
n = str(table_name or "")
return "附表4" in n and "利润与利润分配计算表" in n
def _appendix5_revenue_tax_name(table_name: str) -> bool:
n = str(table_name or "")
return "附表5" in n and "营业收入与营业税金及附加计算表" in n
def _appendix6_cost_name(table_name: str) -> bool:
n = str(table_name or "")
return "附表6" in n and "总成本费用计算表" in n
def _appendix7_materials_name(table_name: str) -> bool:
n = str(table_name or "")
return "附表7" in n and "原材料、燃料及动力费用计算表" in n
def _appendix8_param_name(table_name: str) -> bool:
n = str(table_name or "")
return "附表8" in n and "可研报告和后评价参数对比表" in n
def _appendix_time_table_name(table_name: str) -> bool:
return (
_appendix3_cashflow_name(table_name)
or _appendix4_profit_name(table_name)
or _appendix5_revenue_tax_name(table_name)
or _appendix6_cost_name(table_name)
or _appendix7_materials_name(table_name)
)
def _table_row_seq_name_split_display(table_name: str) -> bool:
"""投资/附表类表:项目列仅展示名称(序号另列,与要素管理一致)。"""
return (
_table51_main_economic_indicators_name(table_name)
or _table52_investment_change_name(table_name)
or _table53_engineering_cost_change_name(table_name)
or _appendix2_investment_structure_name(table_name)
or _appendix_time_table_name(table_name)
or _appendix8_param_name(table_name)
)
def _pick_row_key_with_legacy(
canon: str, row_set: set[str], legacy_map: dict[str, str]
) -> str | None:
"""在库内实际 row_key 中选取规范键或其旧版别名(优先规范键)。"""
if canon in row_set:
return canon
for legacy, normalized in legacy_map.items():
if normalized == canon and legacy in row_set:
return legacy
return None
def _order_rows_by_preferred(
row_order: list[str],
preferred: list[str],
*,
legacy_map: dict[str, str] | None = None,
) -> list[str]:
row_set = set(row_order)
ordered: list[str] = []
seen: set[str] = set()
for canon in preferred:
picked: str | None
if legacy_map:
picked = _pick_row_key_with_legacy(canon, row_set, legacy_map)
else:
picked = canon if canon in row_set else None
if picked and picked not in seen:
seen.add(picked)
ordered.append(picked)
extras = sorted(rk for rk in row_order if rk not in seen)
return ordered + extras
def _legacy_map_for_table(table_name: str) -> dict[str, str] | None:
if _appendix2_investment_structure_name(table_name):
return APPENDIX2_LEGACY_ROW_KEY_MAP
if _appendix8_param_name(table_name):
return APPENDIX8_LEGACY_ROW_KEY_MAP
return None
def _apply_global_table_standard_row_order(table_name: str, row_order: list[str]) -> list[str]:
"""表5-1/5-2/5-3、附表28与要素管理、标准模版一致的标准行序。"""
if not row_order:
return row_order
tn = str(table_name or "")
row_set = set(row_order)
preferred = canonical_row_order_for_table(tn)
if preferred is not None:
return _order_rows_by_preferred(row_order, preferred, legacy_map=_legacy_map_for_table(tn))
if _table53_engineering_cost_change_name(tn):
ordered: list[str] = []
seen: set[str] = set()
for alts in TABLE_5_3_ROW_KEY_ALTERNATES:
picked: str | None = None
for rk in alts:
if rk in row_set:
picked = rk
break
if picked and picked not in seen:
seen.add(picked)
ordered.append(picked)
extras = sorted(rk for rk in row_order if rk not in seen)
return ordered + extras
preferred: list[str] | None = None
if (
_table51_main_economic_indicators_name(tn)
or _table52_investment_change_name(tn)
):
preferred = global_table_row_keys(tn)
if not preferred:
return row_order
ordered = [rk for rk in preferred if rk in row_set]
seen = set(ordered)
extras = sorted(rk for rk in row_order if rk not in seen)
return ordered + extras
def _normalize_table_row_order(row_order: list[str], *, table_name: str = "") -> list[str]:
row_order = _apply_global_table_standard_row_order(table_name, row_order)
normal_rows: list[str] = []
total_rows: list[str] = []
for row in row_order:
text = str(row or "").strip()
compact = re.sub(r"\s+", "", text)
if compact in {"合计", "总计"}:
total_rows.append(row)
else:
normal_rows.append(row)
return normal_rows + total_rows
_BARE_FOUR_DIGIT_YEAR_COL = re.compile(r"^\d{4}$")
_APPENDIX_YEAR_TAIL_NORM = re.compile(r"^(\d{4})年?$")
def _appendix_norm_year_tail(tail: str) -> str | None:
"""将列键尾部「2020」「2020年」规范为四位年字符串非日历年返回 None。"""
t = str(tail or "").strip()
m = _APPENDIX_YEAR_TAIL_NORM.fullmatch(t)
if not m:
return None
y = m.group(1)
try:
yi = int(y)
except ValueError:
return None
if 1900 <= yi <= 2100:
return y
return None
def _filter_redundant_bare_year_columns(col_order: list[str]) -> list[str]:
"""去掉与「组|年度」列重复的旧版纯年份列键(如模板历史同步遗留的 2019 / 2020"""
if not col_order:
return col_order
cols = [str(c).strip() for c in col_order if str(c or "").strip()]
if not cols:
return list(col_order)
years_from_piped: set[str] = set()
for c in cols:
if "|" not in c:
continue
tail = c.rsplit("|", 1)[-1].strip()
ny = _appendix_norm_year_tail(tail)
if ny:
years_from_piped.add(ny)
for tok in re.split(r"[\s,,、/-]+", tail):
t = tok.strip()
if _BARE_FOUR_DIGIT_YEAR_COL.fullmatch(t) and 1900 <= int(t) <= 2100:
years_from_piped.add(t)
if not years_from_piped:
return list(col_order)
out: list[str] = []
for c in col_order:
cs = str(c or "").strip()
ny_bare = _appendix_norm_year_tail(cs)
if ny_bare and ny_bare in years_from_piped:
continue
if _BARE_FOUR_DIGIT_YEAR_COL.fullmatch(cs) and cs in years_from_piped:
continue
out.append(c)
return out if out else list(col_order)
_APPENDIX_TIME_SLOT_GROUPS = frozenset({"建设期", "后评价时点前实际值", "后评价时点后预测值"})
def _appendix_time_slot_group_tail_is_real(tail: str) -> bool:
"""附表时间分组下子列是否为真实年份YYYY / YYYY年"""
if _appendix_norm_year_tail(tail):
return True
t = (tail or "").strip()
if _BARE_FOUR_DIGIT_YEAR_COL.fullmatch(t):
try:
return 1900 <= int(t) <= 2100
except ValueError:
return False
return False
def _bare_appendix_year_placeholder_col_key(s: str) -> bool:
"""无竖线列键末栏「××年」「xx年」等占位列兼容 x/×/全角拉丁混写)。"""
t = str(s or "").strip()
if not t:
return False
if t in ("", "..."):
return True
buf: list[str] = []
for ch in t:
if ch in "xX":
buf.append("×")
elif ch == "\u00d7":
buf.append("×")
else:
buf.append(ch)
u = "".join(buf)
return bool(re.fullmatch(r"×{2}年(?:#\d+)?", u))
def _filter_appendix_placeholder_slot_columns(col_order: list[str]) -> list[str]:
"""某组下已有真实年份列时,该组内只保留 YYYY / YYYY年 子列,并去掉裸组名列与裸「××年」占位列。"""
if not col_order:
return col_order
groups_with_real_year: set[str] = set()
for c in col_order:
cs = str(c or "").strip()
if "|" not in cs:
continue
group, tail = cs.split("|", 1)
group, tail = group.strip(), tail.strip()
if group not in _APPENDIX_TIME_SLOT_GROUPS:
continue
if _appendix_time_slot_group_tail_is_real(tail):
groups_with_real_year.add(group)
if not groups_with_real_year:
return list(col_order)
out: list[str] = []
for c in col_order:
cs = str(c or "").strip()
if "|" not in cs:
if cs in groups_with_real_year:
continue
if _bare_appendix_year_placeholder_col_key(cs):
continue
out.append(c)
continue
group, tail = cs.split("|", 1)
group, tail = group.strip(), tail.strip()
if group in groups_with_real_year:
if _appendix_time_slot_group_tail_is_real(tail):
out.append(c)
continue
out.append(c)
return out if out else list(col_order)
def _filter_appendix3_summary_duplicate_forecast_years(table_name: str, col_order: list[str]) -> list[str]:
"""附表3「建设期」「时点前」下与「后评价时点后预测值」同年栏重复时去掉避免表尾多出 2019/2020 等重复列。"""
tn = str(table_name or "").strip()
if "附表3" not in tn or "项目投资财务现金流量" not in tn:
return col_order
forecast_g = "后评价时点后预测值"
summary_gs = frozenset({"建设期", "后评价时点前实际值"})
fy: set[str] = set()
for c in col_order:
cs = str(c or "").strip()
if "|" not in cs:
continue
g, tail = cs.split("|", 1)
if g.strip() != forecast_g:
continue
ny = _appendix_norm_year_tail(tail.strip())
if ny:
fy.add(ny)
if not fy:
return col_order
drop: set[str] = set()
for c in col_order:
cs = str(c or "").strip()
if "|" not in cs:
continue
g, tail = cs.split("|", 1)
g, tail = g.strip(), tail.strip()
if g not in summary_gs:
continue
ny = _appendix_norm_year_tail(tail)
if ny and ny in fy:
drop.add(cs)
if not drop:
return col_order
out = [c for c in col_order if str(c).strip() not in drop]
return out if out else list(col_order)
def _filter_appendix3_placeholders_when_forecast_has_real_year(table_name: str, col_order: list[str]) -> list[str]:
"""附表3后评价时点后预测值已有 YYYY 列时,三组内所有「××年#n」占位列均剔除含建设期/时点前仅余占位的情况)。"""
tn = str(table_name or "").strip()
if "附表3" not in tn or "项目投资财务现金流量" not in tn:
return list(col_order)
forecast_g = "后评价时点后预测值"
has_forecast_real = False
for c in col_order:
cs = str(c or "").strip()
if "|" not in cs:
continue
g, tail = cs.split("|", 1)
if g.strip() != forecast_g:
continue
if _appendix_norm_year_tail(tail.strip()):
has_forecast_real = True
break
if not has_forecast_real:
return list(col_order)
out: list[str] = []
for c in col_order:
cs = str(c or "").strip()
if "|" not in cs:
out.append(c)
continue
g, tail = cs.split("|", 1)
g, tail = g.strip(), tail.strip()
if g in _APPENDIX_TIME_SLOT_GROUPS and not _appendix_time_slot_group_tail_is_real(tail):
continue
out.append(c)
return out if out else list(col_order)
def _filter_appendix5_orphan_price_unit_column(table_name: str, col_order: list[str]) -> list[str]:
"""附表5去掉与「价格元/t」重复的独立列键「元/t多为表头拆行误入数据列"""
tn = str(table_name or "")
if "附表5" not in tn or "营业收入与营业税金" not in tn:
return col_order
if not any("价格" in str(c) and "元/t" in str(c) for c in col_order):
return col_order
orphans = {"(元/t", "(元/t)"}
out = [c for c in col_order if str(c).strip() not in orphans]
return out if out else list(col_order)
# 表5-5仅按表号匹配不要求表名含「主要生产经营指标」避免要素表标题变体导致过滤未生效
_TABLE_55_TITLE_RX = re.compile(r"\s*5\s*[-\..·]\s*5")
_TABLE_55_FORECAST_GROUP = "后评价时点后预测值"
_TABLE_55_FORECAST_HYPHEN_YEAR = re.compile(
rf"^{re.escape(_TABLE_55_FORECAST_GROUP)}\s*[--—–]\s*(\d{{4}})(?:年)?$"
)
def _compact_zh_ident(s: str) -> str:
return re.sub(r"\s+", "", unicodedata.normalize("NFKC", str(s or "")))
def _split_group_year_col_key(col: str) -> tuple[str, str] | None:
"""解析「组|子列」;支持半角/全角竖线。"""
st = str(col or "").strip()
if not st:
return None
for sep in ("|", "\uff5c"): # U+FF5C 全角竖线
if sep in st:
a, b = st.split(sep, 1)
return a.strip(), b.strip()
return None
def _table55_has_forecast_year_slot_columns(col_order: list[str]) -> bool:
"""是否存在「后评价时点后预测值」下的分年列(|、| 或 后缀 -YYYY"""
for c in col_order:
parts = _split_group_year_col_key(str(c or ""))
if parts:
g, tail = parts
if g != _TABLE_55_FORECAST_GROUP:
continue
ts = tail.strip()
if _appendix_norm_year_tail(ts):
return True
if _BARE_FOUR_DIGIT_YEAR_COL.fullmatch(ts):
try:
if 1900 <= int(ts) <= 2100:
return True
except ValueError:
pass
continue
st = str(c or "").strip()
m = _TABLE_55_FORECAST_HYPHEN_YEAR.match(st)
if m:
try:
if 1900 <= int(m.group(1)) <= 2100:
return True
except ValueError:
pass
return False
def _table55_col_should_drop(col: str, *, has_forecast_year_slots: bool) -> bool:
c0 = _compact_zh_ident(col)
if "时点点后" in c0:
return True
if has_forecast_year_slots and c0 == _compact_zh_ident(_TABLE_55_FORECAST_GROUP):
return True
return False
def _filter_table55_redundant_malformed_forecast_column(table_name: str, col_order: list[str]) -> list[str]:
"""表5-5去掉笔误列「…时点点后…」及在有分年预测列时多余的裸「后评价时点后预测值」列。"""
if not col_order:
return col_order
if not _TABLE_55_TITLE_RX.search(str(table_name or "")):
return list(col_order)
has_slots = _table55_has_forecast_year_slot_columns(col_order)
out = [c for c in col_order if not _table55_col_should_drop(str(c), has_forecast_year_slots=has_slots)]
return out if out else list(col_order)
_APPENDIX_TIME_GROUP_YEAR_HYPHEN_RE = re.compile(
r"^(建设期|后评价时点前实际值|后评价时点后预测值)\s*[--—–]\s*(.+)$"
)
def _appendix_time_col_group_and_tail(col: str) -> tuple[str | None, str | None]:
"""解析附表时间列键为 (组名, 子列);支持「组|年」「组-年」及裸组名列。"""
st = str(col or "").strip()
if not st:
return None, None
parts = _split_group_year_col_key(st)
if parts:
return parts[0], parts[1]
m = _APPENDIX_TIME_GROUP_YEAR_HYPHEN_RE.match(st)
if m:
return m.group(1).strip(), m.group(2).strip()
if st in _APPENDIX_TIME_SLOT_GROUPS or st == "价格(元/t":
return st, ""
return None, None
def _appendix_time_tail_sort_key(tail: str | None) -> tuple[int, int, str]:
"""组内子列排序:裸组名 < 分年列(年份升序) < 占位列 < 其它。"""
t = str(tail or "").strip()
if not t:
return (0, -1, "")
ny = _appendix_norm_year_tail(t)
if ny:
return (1, int(ny), "")
if _BARE_FOUR_DIGIT_YEAR_COL.fullmatch(t):
try:
yi = int(t)
if 1900 <= yi <= 2100:
return (1, yi, "")
except ValueError:
pass
buf: list[str] = []
for ch in t:
if ch in "xX":
buf.append("×")
elif ch == "\u00d7":
buf.append("×")
else:
buf.append(ch)
pm = re.fullmatch(r"×{2}年#(\d+)", "".join(buf))
if pm:
return (2, int(pm.group(1)), "")
return (3, 0, t)
def _reorder_appendix_time_col_order(table_name: str, col_order: list[str]) -> list[str]:
"""附表37按细则组序排列列组内年份从小到大。"""
if not col_order or not _appendix_time_table_name(table_name):
return list(col_order)
spec_groups = time_table_default_columns_for_name(table_name) or []
group_rank: dict[str, int] = {g: i for i, g in enumerate(spec_groups)}
by_group: dict[str, list[str]] = {}
ungrouped: list[str] = []
for col in col_order:
cs = str(col or "").strip()
if not cs:
continue
g, _ = _appendix_time_col_group_and_tail(cs)
if g == "价格(元/t" or g in _APPENDIX_TIME_SLOT_GROUPS:
by_group.setdefault(g, []).append(cs)
if g not in group_rank:
group_rank[g] = len(group_rank) + 100
else:
ungrouped.append(cs)
def _sort_group_cols(cols: list[str]) -> list[str]:
return sorted(
cols,
key=lambda c: _appendix_time_tail_sort_key(_appendix_time_col_group_and_tail(c)[1]),
)
ordered_groups = list(spec_groups)
for g in sorted(by_group.keys(), key=lambda x: group_rank.get(x, 999)):
if g not in ordered_groups:
ordered_groups.append(g)
out: list[str] = []
seen: set[str] = set()
for g in ordered_groups:
cols = by_group.get(g)
if not cols:
continue
for c in _sort_group_cols(cols):
if c not in seen:
out.append(c)
seen.add(c)
for c in ungrouped:
if c not in seen:
out.append(c)
seen.add(c)
return out if out else list(col_order)
def _build_structured_table_html(
table_name: str,
row_order: list[str],
col_order: list[str],
latest: dict[tuple[str, str], str],
) -> str:
row_order = _normalize_table_row_order(row_order, table_name=table_name)
row_header = _row_header_name_for_table(table_name)
grouped = _group_column_headers(col_order)
lines: list[str] = ["<table>", " <thead>"]
if grouped:
top_headers, sub_headers = grouped
lines.append(" <tr>")
lines.append(' <th rowspan="2">序号</th>')
lines.append(f' <th rowspan="2">{row_header}</th>')
idx = 0
while idx < len(top_headers):
group = top_headers[idx]
if not group:
lines.append(f' <th rowspan="2">{sub_headers[idx]}</th>')
idx += 1
continue
span = 1
while idx + span < len(top_headers) and top_headers[idx + span] == group:
span += 1
lines.append(f' <th colspan="{span}">{group}</th>')
idx += span
lines.append(" </tr>")
lines.append(" <tr>")
for top, sub in zip(top_headers, sub_headers):
if top:
lines.append(f" <th>{sub}</th>")
lines.append(" </tr>")
else:
lines.append(" <tr>")
lines.append(" <th>序号</th>")
lines.append(f" <th>{row_header}</th>")
for col in col_order:
lines.append(f" <th>{col}</th>")
lines.append(" </tr>")
lines.append(" </thead>")
lines.append(" <tbody>")
max_rows = min(120, len(row_order)) if _is_table54_operating_benefit(table_name) else min(24, len(row_order))
serial_col = _table_row_outline_serial_column(row_order, max_rows=max_rows)
for idx, rk in enumerate(row_order[:max_rows], start=1):
display_rk = _project_column_row_label(
table_name, rk, latest, serial_col=serial_col
)
serial_cell = _serial_cell_for_report_table(
table_name, rk, idx, serial_col, serial_idx=idx - 1
)
lines.append(" <tr>")
lines.append(f" <td>{serial_cell}</td>")
lines.append(f" <td>{display_rk}</td>")
for ck in col_order:
val = latest.get((rk, ck), "待补充") or "待补充"
lines.append(f" <td>{val}</td>")
lines.append(" </tr>")
lines.append(" </tbody>")
lines.append("</table>")
return "\n".join(lines)
_RE_MD_HEADER_NAME_UNIT = re.compile(r"^(.+?)\s*([(][^)]+[)])$")
def _strip_md_bold_markup(text: str) -> str:
"""去掉 Markdown 加粗标记 **,保留其余内容。"""
s = str(text or "")
while True:
new = re.sub(r"\*\*([^*]+?)\*\*", r"\1", s)
if new == s:
break
s = new
return s
def _markdown_table_header_cell_display(col_label: str, *, plain: bool = False) -> str:
"""表头栏 Markdown量纲写在名称下方单位加括号同一单元格内用 <br> 换行);不加粗。"""
del plain # 保留参数以兼容旧调用;表头一律不加 ** 包裹
s = _strip_md_bold_markup(str(col_label or "").strip()).replace("|", "")
if not s:
return ""
if re.search(r"<br\s*/?>", s, re.I):
parts = [
_strip_md_bold_markup(p).strip()
for p in re.split(r"<br\s*/?>", s, flags=re.I)
]
out_parts = [p for p in parts if p]
return "<br>".join(out_parts) if out_parts else s
m = _RE_MD_HEADER_NAME_UNIT.match(s)
if m:
name, unit = m.group(1).strip(), m.group(2).strip()
if name:
return f"{name}<br>{unit}" if unit else name
return s
def _common_trailing_parenthetical_unit_from_flat_labels(
flat_cols: list[str],
) -> tuple[str | None, list[str]]:
"""当合并后的列表头列名末尾「(单位)」在各列一致时,返回该单位及去掉单位后的表头文案。"""
stripped: list[str] = []
units: list[str | None] = []
for lab in flat_cols:
s = str(lab or "").strip()
m = _RE_MD_HEADER_NAME_UNIT.match(s)
if m:
stripped.append(m.group(1).strip())
units.append(m.group(2).strip())
else:
stripped.append(s)
units.append(None)
present = [u for u in units if u]
if not present:
return None, list(flat_cols)
u0 = present[0]
if any(units[i] is not None and units[i] != u0 for i in range(len(units))):
return None, list(flat_cols)
return u0, stripped
# 表号与表名之间空两格:采用两个全角空格(与公文「空两格」习惯一致)
_TABLE_CAPTION_NUMBER_NAME_GAP = "\u3000\u3000"
_TABLE52_INVESTMENT_CHANGE_CAPTION = (
f"表5-2{_TABLE_CAPTION_NUMBER_NAME_GAP}投资变动情况表(单位:万元、万美元)"
)
_RE_TABLE_CAPTION_LEADING_TOKEN = re.compile(
r"^(附表\s*\d+(?:\s*[.\-]\s*\d+)*|表\s*\d+(?:\s*[.\-]\s*\d+)*)\s*(.*)$",
re.DOTALL,
)
def _fix_521_table52_wrong_caption(content: str) -> str:
"""5.2.1 若表题误用「表5-2 同类烷基化…」等,改回标准投资变动情况表表题。"""
text = str(content or "")
if not text.strip():
return text
caption_re = re.compile(
r"^(\s*(?:#{1,6}\s+)?)(表\s*5\s*[-]\s*2\s*(.*))$",
re.IGNORECASE,
)
out: list[str] = []
for line in text.split("\n"):
m = caption_re.match(line)
if m:
tail = (m.group(3) or "").strip()
if "投资变动情况表" not in tail:
out.append(f"{m.group(1)}{_TABLE52_INVESTMENT_CHANGE_CAPTION}")
continue
out.append(line)
return "\n".join(out)
def _normalize_table_caption_number_name_gap(title: str) -> str:
"""将「表2-4xxx」「表 2 - 4 xxx」规范为「表2-4」+ 两全角空格 + 表名。"""
s = str(title or "").strip()
if not s:
return s
m = _RE_TABLE_CAPTION_LEADING_TOKEN.match(s)
if not m:
return s
token_compact = re.sub(r"\s+", "", (m.group(1) or "").strip())
rest = (m.group(2) or "").strip()
if not rest:
return token_compact
return f"{token_compact}{_TABLE_CAPTION_NUMBER_NAME_GAP}{rest}"
def _rewrite_table_caption_line_for_number_name_gap(line: str) -> str:
"""修正独立表题行(非表格管道行)中表号与表名间距。"""
if "|" in line or not line.strip():
return line
if line.strip().startswith("```"):
return line
m = re.match(r"^(\s*)(.*)$", line)
if not m:
return line
indent, rest = m.group(1), m.group(2)
h = ""
m2 = re.match(r"^(#{1,6}\s+)(.*)$", rest)
if m2:
h, rest = m2.group(1), m2.group(2)
rest_st = rest.strip()
if not rest_st:
return line
if rest_st.startswith("**") and rest_st.endswith("**") and len(rest_st) >= 4:
inner = _strip_md_bold_markup(rest_st[2:-2]).strip()
n = _normalize_table_caption_number_name_gap(inner)
return f"{indent}{h}{n}"
n2 = _normalize_table_caption_number_name_gap(_strip_md_bold_markup(rest_st))
if n2 != rest_st:
return f"{indent}{h}{n2}"
return line
def _debold_md_table_row(line: str) -> str:
if "|" not in line:
return line
return "|".join(_strip_md_bold_markup(part) for part in line.split("|"))
def _debold_markdown_table_blocks_in_content(content: str) -> str:
"""去掉 Markdown 管道表表头行中的 ** 加粗(含双行表头)。"""
lines = str(content or "").split("\n")
if not lines:
return str(content or "")
out: list[str] = []
i = 0
while i < len(lines):
ln = lines[i]
if _is_pipe_markdown_table_row_line(ln) and not _is_pipe_markdown_table_separator_line(ln):
header_rows: list[str] = []
j = i
while j < len(lines) and _is_pipe_markdown_table_row_line(lines[j]) and not _is_pipe_markdown_table_separator_line(lines[j]):
header_rows.append(lines[j])
j += 1
if j < len(lines) and _is_pipe_markdown_table_separator_line(lines[j]):
out.extend(_debold_md_table_row(hr) for hr in header_rows)
out.append(lines[j])
j += 1
while j < len(lines) and (
_is_pipe_markdown_table_row_line(lines[j])
or _is_pipe_markdown_table_separator_line(lines[j])
):
out.append(lines[j])
j += 1
i = j
continue
out.extend(header_rows)
i += len(header_rows)
continue
out.append(ln)
i += 1
return "\n".join(out)
def _normalize_table_captions_in_markdown(content: str) -> str:
if not str(content or "").strip():
return str(content or "")
lines = str(content).split("\n")
text = "\n".join(_rewrite_table_caption_line_for_number_name_gap(ln) for ln in lines)
return _debold_markdown_table_blocks_in_content(text)
def _merge_table_title_with_common_unit(base_title: str, unit: str | None) -> str:
"""表题末尾追加各列相同的公共单位(括号形式);表题已含该单位则不重复。"""
b = str(base_title or "").strip()
if not unit or not str(unit).strip():
return _normalize_table_caption_number_name_gap(b)
u = str(unit).strip()
bc = re.sub(r"\s+", "", b)
uc = re.sub(r"\s+", "", u)
if uc and bc.endswith(uc):
return _normalize_table_caption_number_name_gap(b)
return _normalize_table_caption_number_name_gap(f"{b} {u}")
def _render_table_7_1_markdown(
row_order: list[str],
col_order: list[str],
latest: dict[tuple[str, str], str],
) -> str:
"""表7-1 行键为「指标·要素」或「综合得分」;输出合同要求的「指标」「要素」分列。"""
def esc(v: str) -> str:
return str(v or "").replace("|", "")
data_cols: list[str] = []
for c in TABLE_7_1_COLUMN_KEYS:
if c in col_order:
data_cols.append(c)
for c in col_order:
if c not in data_cols:
data_cols.append(c)
preferred = [rk for rk, _ in TABLE_7_1_ROW_CELL_DEFAULTS]
preferred_set = set(preferred)
ordered_rows = [rk for rk in preferred if rk in row_order]
for rk in row_order:
if rk not in preferred_set:
ordered_rows.append(rk)
header = "| " + " | ".join(
[_markdown_table_header_cell_display("指标"), _markdown_table_header_cell_display("要素")]
+ [_markdown_table_header_cell_display(c) for c in data_cols]
) + " |"
sep = "| " + " | ".join(["---"] * (2 + len(data_cols))) + " |"
lines = [header, sep]
for rk in ordered_rows:
rk_s = str(rk or "").strip()
if rk_s == "综合得分":
ind, elem = "综合得分", ""
elif "·" in rk_s:
left, right = rk_s.split("·", 1)
ind, elem = left.strip(), right.strip()
else:
ind, elem = rk_s, ""
vals: list[str] = []
for ck in data_cols:
raw = str(latest.get((rk_s, ck), "") or "").strip()
vals.append(esc(raw if raw else "待补充"))
lines.append("| " + " | ".join([esc(ind), esc(elem)] + vals) + " |")
return "\n".join(lines) + "\n"
def _build_structured_table_markdown(db: Session, table_id: str, table_name: str = "") -> tuple[str, str | None]:
cells = (
db.query(ElementCell)
.filter(
ElementCell.table_id == table_id,
ElementCell.value.isnot(None),
ElementCell.value != "",
)
.order_by(ElementCell.updated_at.desc())
.all()
)
if not cells:
return "", None
latest: dict[tuple[str, str], str] = {}
row_order: list[str] = []
col_order: list[str] = []
for cell in cells:
row_key = str(cell.row_key or "").strip()
col_key = str(cell.col_key or "内容").strip() or "内容"
if not row_key:
continue
key = (row_key, col_key)
if key not in latest:
latest[key] = str(cell.value or "").strip()
if row_key not in row_order:
row_order.append(row_key)
if col_key not in col_order:
col_order.append(col_key)
if not row_order:
return "", None
row_order = _normalize_table_row_order(row_order, table_name=table_name)
spec = _multi_column_global_spec_for_table(table_name)
if spec:
spec_cols = [str(col).strip() for col in (spec[0] or []) if str(col).strip()]
ordered = [col for col in spec_cols if col in col_order]
extras = [col for col in col_order if col not in ordered]
col_order = ordered + extras
col_order = _filter_redundant_bare_year_columns(col_order)
col_order = _filter_appendix_placeholder_slot_columns(col_order)
col_order = _filter_appendix3_summary_duplicate_forecast_years(table_name, col_order)
col_order = _filter_appendix3_placeholders_when_forecast_has_real_year(table_name, col_order)
col_order = _filter_appendix5_orphan_price_unit_column(table_name, col_order)
col_order = _filter_table55_redundant_malformed_forecast_column(table_name, col_order)
col_order = _reorder_appendix_time_col_order(table_name, col_order)
inferred_ty: int | None = None
if _is_table54_operating_benefit(str(table_name or "").strip()):
ty_row = db.query(ElementTable.year).filter(ElementTable.id == table_id).first()
tbl_y = int(ty_row[0]) if ty_row and ty_row[0] is not None and int(ty_row[0]) > 0 else None
inferred_ty = _infer_time_column_year_for_table54(col_order, cells, tbl_y)
if str(table_name or "").strip() == TABLE_7_1_SCORING_TABLE_NAME:
return _render_table_7_1_markdown(row_order, col_order, latest), None
return _render_markdown_table(
table_name, row_order, col_order, latest, time_column_year=inferred_ty
)
_PLACEHOLDER_ROW_PREFIX = re.compile(r"^(产品名称|原料名称|项目名称|名称|产品|项目)\s*[·.]\s*")
def _display_row_key(table_name: str, rk: str, latest: dict[tuple[str, str], str]) -> str:
"""将模板占位行名替换为真实名称(优先使用该行单元格中的项目/产品名称)。"""
text = str(rk or "").strip()
m = _PLACEHOLDER_ROW_PREFIX.match(text)
if not m:
return text
preferred_cols: list[str] = []
tn = str(table_name or "")
if "表2-4" in tn or "产品流向" in tn:
preferred_cols.extend(["项目名称", "产品名称", "规格"])
preferred_cols.extend(["项目名称", "产品名称", "名称", "规格"])
for col in preferred_cols:
v = str(latest.get((rk, col), "") or "").strip()
if v and v != "待补充":
return v
suffix = text[m.end():].strip()
return f"产品{suffix}" if suffix else text
# 与正文层次编号一致row_key 形如「1.1 建设投资」「1.2.3 工艺」「3原料」
_ROWKEY_OUTLINE_PREFIX = re.compile(
r"^\s*(\d+(?:\.\d+)*)(?:\s*[、..]?\s+(?=\S)|(?=[\u4e00-\u9fffA-Za-z(]))"
)
_CN_OUTLINE_ROWKEY_PREFIX = re.compile(r"^\s*([一二三四五六七八九十百千]+)\s+(.+)$")
_APPENDIX5_PRODUCT_TRIPLE = re.compile(
r"^(\d+(?:\.\d+)*)\s+(.+?)·(销量|营业收入|销项税)$"
)
_APPENDIX7_DETAIL_ROW = re.compile(
r"^(\d+(?:\.\d+)*)\s+([^·]+?)(?:·(单价|数量|进项税额|……))?$"
)
_TABLE_53_LEGACY_ROW_DISPLAY: dict[str, str] = {
"工程费用变动·批准单位": "批准单位",
"工程费用变动·批准文号": "批准文号",
"工程费用变动·工程费用合计": "工程费用合计",
"工程费用变动·工艺生产装置": "1 工艺生产装置",
"工程费用变动·装置·设备购置费": "1.1.1 设备购置费",
"工程费用变动·装置·安装工程费": "1.1.2 安装工程费",
"工程费用变动·装置·建筑工程费": "1.1.3 建筑工程费",
"工程费用变动·总图运输": "2 总图运输",
"工程费用变动·储运工程": "3 储运工程",
"工程费用变动·其它分项(可增删)": "其它分项(可增删)",
}
def _strip_table_prefix_from_row_key(rk: str) -> str:
s = str(rk or "").strip()
if "\u00b7" in s:
return "\u00b7".join(s.split("\u00b7")[1:]).strip()
return s
def _parse_row_key_seq_and_name(rk: str, *, table_name: str = "") -> tuple[str, str]:
"""与 quick-fill.js ``parseRowKeyForDisplay`` 一致。"""
s = str(rk or "").strip()
if not s:
return "", ""
legacy = _legacy_map_for_table(table_name)
if legacy:
s = legacy.get(s, s)
if _table53_engineering_cost_change_name(table_name):
s = _TABLE_53_LEGACY_ROW_DISPLAY.get(s, s)
if _appendix_time_table_name(table_name) or _appendix8_param_name(table_name):
s = _strip_table_prefix_from_row_key(s)
elif "\u00b7" in s:
s = "\u00b7".join(s.split("\u00b7")[1:]).strip()
m = _ROWKEY_OUTLINE_PREFIX.match(s)
if m:
rest = s[m.end():].strip()
return m.group(1), rest if rest else s
m_cn = _CN_OUTLINE_ROWKEY_PREFIX.match(s)
if m_cn:
return m_cn.group(1), m_cn.group(2).strip()
return "", s
def _row_display_name_for_table(table_name: str, rk: str) -> str:
"""项目/工程名称列展示文案去表内前缀与层次编号附表5/7 明细行单独处理)。"""
s0 = str(rk or "").strip()
if not s0:
return ""
if _appendix5_revenue_tax_name(table_name):
s = _strip_table_prefix_from_row_key(s0)
m = _APPENDIX5_PRODUCT_TRIPLE.match(s)
if m:
return m.group(3)
if _appendix7_materials_name(table_name):
s = _strip_table_prefix_from_row_key(s0)
m = _APPENDIX7_DETAIL_ROW.match(s)
if m and m.group(3):
return m.group(3)
if m:
return m.group(2).strip()
if _table_row_seq_name_split_display(table_name):
_, name = _parse_row_key_seq_and_name(s0, table_name=table_name)
return name or s0
return s0
def _serial_cell_for_report_table(
table_name: str,
rk: str,
idx: int,
serial_col: list[str] | None,
*,
serial_idx: int,
) -> str:
"""表5-2/5-3、附表28 用连续 1..n表5-1 用层次编号;其余表沿用原逻辑。"""
if _table_row_seq_name_split_display(table_name) and not _table51_main_economic_indicators_name(
table_name
):
return str(idx)
if _table51_main_economic_indicators_name(table_name):
seq, _ = _parse_row_key_seq_and_name(rk, table_name=table_name)
return seq if seq else str(idx)
if serial_col is not None:
return serial_col[serial_idx]
return str(idx)
def _project_column_row_label(
table_name: str,
rk: str,
latest: dict[tuple[str, str], str],
*,
serial_col: list[str] | None,
) -> str:
if _table_row_seq_name_split_display(table_name):
label = _row_display_name_for_table(table_name, rk)
elif serial_col is not None:
label = _strip_row_key_leading_outline_for_display(rk)
if not str(label or "").strip():
label = rk
else:
label = rk
return _element_manage_table_row_display_label(
table_name, _display_row_key(table_name, label, latest)
)
def _outline_serial_from_row_key(rk: str) -> str | None:
"""若 row_key 以阿拉伯数字层次编号开头,返回该编号字符串。"""
rk_s = str(rk or "").strip()
if not rk_s:
return None
compact = re.sub(r"\s+", "", rk_s)
if compact in ("合计", "总计"):
return None
m = _ROWKEY_OUTLINE_PREFIX.match(rk_s)
if not m:
return None
num = m.group(1)
if re.fullmatch(r"\d{4}", num):
try:
yi = int(num)
except ValueError:
return None
if 1900 <= yi <= 2100:
return None
return num
def _table_row_outline_serial_column(row_order: list[str], *, max_rows: int) -> list[str] | None:
"""当每一数据行(合计/总计除外)的 row_key 均带层次编号时,序号列采用该编号。"""
rows = row_order[:max_rows]
if not rows:
return None
serials: list[str] = []
for rk in rows:
compact = re.sub(r"\s+", "", str(rk or ""))
if compact in ("合计", "总计") or str(rk or "").strip() in ("合计", "总计"):
serials.append("")
continue
s = _outline_serial_from_row_key(str(rk) or "")
if s is None:
return None
serials.append(s)
return serials
def _strip_row_key_leading_outline_for_display(rk: str) -> str:
"""去掉 row_key 首部层次编号,避免第二列与序号列重复。"""
rk_s = str(rk or "").strip()
m = _ROWKEY_OUTLINE_PREFIX.match(rk_s)
if not m:
return rk_s
rest = rk_s[m.end():].strip()
return rest if rest else rk_s
def _render_markdown_table(
table_name: str,
row_order: list[str],
col_order: list[str],
latest: dict[tuple[str, str], str],
*,
time_column_year: int | None = None,
) -> tuple[str, str | None]:
col_order = list(col_order)
tn = str(table_name or "").strip()
table54 = _is_table54_operating_benefit(tn)
if table54:
_table54_remap_indicator_unit_latest(latest)
_table54_rekey_latest_col_keys(latest)
col_order = _reorder_table54_col_order(col_order)
grouped = _group_column_headers(col_order)
if grouped:
top_headers, sub_headers = grouped
flat_cols: list[str] = []
for top, sub in zip(top_headers, sub_headers):
if top and sub:
flat_cols.append(f"{top}-{sub}")
elif top:
flat_cols.append(top)
else:
flat_cols.append(sub)
else:
flat_cols = list(col_order)
if table54:
flat_cols = list(col_order)
flat_header_labels = _table54_markdown_header_labels(
col_order, time_column_year=time_column_year
)
common_unit = None
else:
common_unit, flat_header_labels = _common_trailing_parenthetical_unit_from_flat_labels(flat_cols)
if common_unit is None:
flat_header_labels = flat_cols
def _esc_pipe(v: str) -> str:
return str(v or "").replace("|", "")
row_header = _row_header_name_for_table(table_name)
if table54:
row_header = "项目"
header = (
"| "
+ " | ".join(
[
_markdown_table_header_cell_display("序号"),
_markdown_table_header_cell_display(row_header),
]
+ [_markdown_table_header_cell_display(c) for c in flat_header_labels]
)
+ " |"
)
split = "| --- | --- | " + " | ".join(["---"] * len(flat_cols)) + " |"
lines = [header, split]
max_rows = min(120, len(row_order)) if table54 else min(24, len(row_order))
serial_col = _table_row_outline_serial_column(row_order, max_rows=max_rows)
for idx, rk in enumerate(row_order[:max_rows], start=1):
vals = [_esc_pipe(latest.get((rk, ck), "待补充") or "待补充") for ck in col_order]
display_rk = _project_column_row_label(
table_name, rk, latest, serial_col=serial_col
)
serial_cell = _serial_cell_for_report_table(
table_name, rk, idx, serial_col, serial_idx=idx - 1
)
lines.append("| " + serial_cell + " | " + _esc_pipe(display_rk) + " | " + " | ".join(vals) + " |")
return "\n".join(lines) + "\n", common_unit
def _build_time_table_markdowns_by_year(
db: Session, table_id: str, table_name: str = "",
) -> list[tuple[str, str]]:
"""为时间要素表按 year 拆分,返回 [(display_table_name, markdown), ...] 列表。
时间表的 ElementCell 通过 year 字段区分不同年份的数据;前端用 col_key + "|" + year
渲染多级表头。本函数按年份分别聚合 cell为每个年份生成独立的 Markdown 表格,
表名中的「××年」替换为实际年份。
"""
cells = (
db.query(ElementCell)
.filter(
ElementCell.table_id == table_id,
ElementCell.value.isnot(None),
ElementCell.value != "",
)
.order_by(ElementCell.updated_at.desc())
.all()
)
if not cells:
return []
from collections import defaultdict
year_cells: dict[int | None, list[ElementCell]] = defaultdict(list)
for cell in cells:
year_cells[cell.year].append(cell)
ty_row = db.query(ElementTable.year).filter(ElementTable.id == table_id).first()
tbl_y = (
int(ty_row[0])
if ty_row and ty_row[0] is not None and int(ty_row[0]) > 0
else None
)
base_name = str(table_name or "").strip()
if _is_table54_operating_benefit(base_name):
year_cells, real_years = _table54_merge_year_cells_for_table_year(
year_cells, table_year=tbl_y
)
else:
real_years = sorted(y for y in year_cells if y is not None)
if not real_years:
md, common_unit = _build_structured_table_markdown(db, table_id, table_name)
if not md:
return []
disp = _merge_table_title_with_common_unit(str(table_name or "").strip(), common_unit)
return [(disp, md)]
results: list[tuple[str, str]] = []
for year in real_years:
year_cell_list = year_cells[year]
latest: dict[tuple[str, str], str] = {}
row_order: list[str] = []
col_order: list[str] = []
for cell in year_cell_list:
row_key = str(cell.row_key or "").strip()
col_key = str(cell.col_key or "内容").strip() or "内容"
if not row_key:
continue
key = (row_key, col_key)
if key not in latest:
latest[key] = str(cell.value or "").strip()
if row_key not in row_order:
row_order.append(row_key)
if col_key not in col_order:
col_order.append(col_key)
if not row_order:
continue
if _is_table54_operating_benefit(base_name):
_table54_coalesce_legacy_bare_metric_cols(latest, row_order)
row_order = _normalize_table_row_order(row_order, table_name=base_name)
time_spec_cols = time_table_default_columns_for_name(base_name)
if time_spec_cols and _is_table54_operating_benefit(base_name):
col_order = ["单位"] + [c for c in time_spec_cols if c != "单位"]
elif time_spec_cols:
ordered = [col for col in time_spec_cols if col in col_order]
extras = [col for col in col_order if col not in ordered]
col_order = ordered + extras
col_order = _filter_redundant_bare_year_columns(col_order)
col_order = _filter_appendix_placeholder_slot_columns(col_order)
col_order = _filter_appendix3_summary_duplicate_forecast_years(base_name, col_order)
col_order = _filter_appendix3_placeholders_when_forecast_has_real_year(base_name, col_order)
col_order = _filter_appendix5_orphan_price_unit_column(base_name, col_order)
col_order = _filter_table55_redundant_malformed_forecast_column(base_name, col_order)
col_order = _reorder_appendix_time_col_order(base_name, col_order)
display_name = re.sub(r"××年", f"{year}", base_name)
md, common_unit = _render_markdown_table(
display_name, row_order, col_order, latest, time_column_year=year
)
if md:
results.append((_merge_table_title_with_common_unit(display_name, common_unit), md))
if not results and None in year_cells:
md, common_unit = _build_structured_table_markdown(db, table_id, table_name)
if md:
results.append((_merge_table_title_with_common_unit(str(table_name or "").strip(), common_unit), md))
return results
def _extract_table_short_token(table_name: str) -> str:
text = str(table_name or "")
m = re.search(r"(附表\s*\d+(?:\s*[.\-]\s*\d+)*|表\s*\d+(?:\s*[.\-]\s*\d+)*)", text)
return re.sub(r"\s+", "", m.group(1)) if m else ""
def _norm_table_token(token: str) -> str:
text = re.sub(r"\s+", "", str(token or "")).lower()
return text.replace("", "-").replace("", "-").replace("", "-")
def _table_token_matches_name(token: str, name: str, *, normalized: bool = False) -> bool:
"""
表号精确匹配避免“表1”误命中“表10”。
- normalized=True: token/name 已经是 _norm_table_token 结果。
"""
t = token if normalized else _norm_table_token(token)
n = name if normalized else _norm_table_token(name)
if not t or not n:
return False
if t == n:
return True
# 兼容历史项目4.3.3 的“烷基化装置运行分析”可能仍存为表4-1仍应视为表4-2 的同义候选。
if t == _norm_table_token("表4-2"):
raw_name = str(name or "")
name_plain = re.sub(r"\s+", "", raw_name)
if ("烷基化装置运行分析" in name_plain) and ("考核时间" in name_plain):
if ("表4-1" in name_plain) or ("表4-2" in name_plain) or ("表41" in _norm_table_token(name_plain)):
return True
# 后面不能紧跟 1-2 位数字后即结束或遇到非数字(避免 表1→表10、表2-4→表2-40
# 但允许紧跟 4 位年份(如 表2-42019年…或非数字字符如 表2-4××年…
pattern = re.compile(rf"{re.escape(t)}(?!\d{{1,2}}(?!\d))")
return bool(pattern.search(n))
def _table_token_caption_line_re(token: str) -> re.Pattern[str]:
token_plain = re.sub(r"\s+", "", str(token or ""))
token_re = re.escape(token_plain).replace(r"\-", r"[--—–]")
return re.compile(
r"(?:^|\n)([^\n]*?" + token_re + r"[^\n]*)\n",
flags=re.IGNORECASE,
)
def _segment_after_table_caption(content: str, token: str) -> str:
"""本表表题行之后、下一张「表 x-x …」表题之前的内容(不含引用语中的表号)。"""
text = str(content or "")
cap = _table_token_caption_line_re(token).search(text)
if not cap:
return ""
rest = text[cap.end() :]
next_cap = re.search(
r"\n[^\n]*?表\s*\d+(?:\s*[-]\s*\d+)*\s+[\u4e00-\u9fff]",
rest,
flags=re.IGNORECASE,
)
if next_cap:
return rest[: next_cap.start()]
return rest
def _segment_has_markdown_table_body(segment: str) -> bool:
seg = str(segment or "")
if not seg.strip():
return False
return bool(
re.search(
r"(?:<table>|(?:\n[ \t]*\|[^\n]+\|[ \t]*\n[ \t]*\|[-:\s|]+\|))",
seg,
flags=re.IGNORECASE,
)
)
def _replace_caption_stub_with_authoritative_table(
content: str, token: str, authoritative_block: str
) -> str:
"""将「仅有表题/注释、无表体」的占位段替换为要素直出整块(用于 3.3.4 表3-4 等)。"""
text = str(content or "")
block = str(authoritative_block or "").strip()
if not block:
return text
cap = _table_token_caption_line_re(token).search(text)
if not cap:
return text.rstrip() + "\n\n" + block
region_start = cap.start()
if region_start > 0 and text[region_start] == "\n":
region_start += 1
rest = text[cap.end() :]
next_cap = re.search(
r"\n[^\n]*?表\s*\d+(?:\s*[-]\s*\d+)*\s+[\u4e00-\u9fff]",
rest,
flags=re.IGNORECASE,
)
region_end = cap.end() + (next_cap.start() if next_cap else len(rest))
head = text[:region_start].rstrip("\n")
tail = text[region_end:].lstrip("\n")
if head:
return f"{head}\n\n{block}\n\n{tail}".strip() if tail else f"{head}\n\n{block}".strip()
return f"{block}\n\n{tail}".strip() if tail else block
def _table_token_exists(content: str, token: str) -> bool:
text = str(content or "")
t = _norm_table_token(token)
if not text or not t:
return False
# 须有独立表题行正文「见表3-3~表3-5」等引用不算。
if not _table_token_caption_line_re(token).search(text):
return False
# 表体必须紧跟在本表表题与下一张表题之间,不得借用后续表的 Markdown 块(如 3.3.4 仅表3-4 题、表3-5 有体)。
return _segment_has_markdown_table_body(_segment_after_table_caption(text, token))
def _basic_warnings(section_title: str, content: str) -> list[str]:
warnings: list[str] = []
if len(content.strip()) < 80:
warnings.append("章节内容过短,建议补充证据后重试")
title_norm = re.sub(r"\s+", "", str(section_title or ""))
if "1.2项目决策要点" in title_norm:
if "1.2.1项目背景" not in content or "1.2.2预期目标" not in content:
warnings.append("1.2 未按固定结构输出缺少“1.2.1项目背景/1.2.2预期目标”小节)")
if "2.1.1资源与原料评价" in title_norm:
if "原料数量及组成对比表" not in content:
warnings.append("2.1.1 缺少模版规定的「原料数量及组成对比表」标题")
if "原料性质对比表(醚后碳四)" not in content and "原料性质对比表" not in content:
warnings.append("2.1.1 缺少模版规定的「原料性质对比表(醚后碳四)」标题")
if "原料选择加氢工艺技术对比" in content or (
"表2.6-1" in content
and "原料数量及组成对比" not in content
and "原料选择加氢" in content
):
warnings.append("2.1.1 不应出现安评类「表2.6-1 原料选择加氢工艺技术对比」等内容,本节仅允许模版主表")
if "附录:原料预处理工艺方案比选" in content or "(非模版主表)" in content:
warnings.append("2.1.1 不应出现附录或“非模版主表”字样,请仅保留模版两张主表")
if "" in section_title and "|" not in content:
warnings.append("章节标题疑似要求表格,但输出未包含 Markdown 表格")
if "待补充" in content and len(content.strip()) < 140:
warnings.append("缺失信息较多,建议补充材料后重跑")
return warnings
def _check_consistency(report: str, project_name: str) -> list[str]:
out: list[str] = []
if project_name and project_name not in report:
out.append("正文未显式出现项目名称,请检查第一章基本信息。")
amounts = re.findall(r"(\d+(?:\.\d+)?)\s*(亿元|万元|万)", report)
if amounts:
normalized = [f"{v}-{u}" for v, u in amounts]
if len(normalized) >= 4 and len(set(normalized[:10])) >= 6:
out.append("金额口径较分散,建议统一投资/决算/效益统计口径。")
unit_lines = re.findall(r"(?:单位|计量单位)\s*[:]\s*([^\n]{1,40})", report)
if unit_lines and len(set(unit_lines)) > 1:
out.append("检测到多个计量单位定义,建议统一单位说明(如万元、吨/年)。")
years = [int(y) for y in re.findall(r"(20\d{2})年", report)]
if years:
min_y, max_y = min(years), max(years)
if max_y - min_y >= 12:
out.append("年份跨度较大,建议复核建设期与运营期时间线是否混写。")
if "待补充" in report:
missing_count = report.count("待补充")
if missing_count >= 10:
out.append(f"全篇“待补充”出现 {missing_count} 次,建议补充关键材料后重跑。")
if _has_conflict_terms(report):
out.append("发现同一指标存在“增加/下降”等相反表述,建议人工复核结论口径。")
return out
def _append_report_appendices(db: Session, project_uuid: str, report_text: str) -> str:
"""
为最终报告追加「附图/附表」(细则顺序:附图在上,附表在下)。
说明:
- 附图:从项目知识库 .docx 中解析嵌入图(全厂/装置物料平衡等),以 Markdown 内嵌图输出;
解析不到则不输出该项(无占位说明)。
- 附表从结构化表element_tables/element_cells汇总优先抓取表名包含「附表」的表。
"""
base = (report_text or "").strip()
if not base:
base = ""
appendix_tables = _build_appendix_tables_markdown(db, project_uuid)
appendix_figures = _build_appendix_figures_markdown(db, project_uuid)
parts = [base] if base else []
if appendix_figures:
parts.append(appendix_figures)
if appendix_tables:
parts.append(appendix_tables)
return "\n\n".join([p for p in parts if str(p).strip()]).strip()
def _build_appendix_tables_markdown(db: Session, project_uuid: str) -> str:
tables = (
db.query(ElementTable)
.filter(ElementTable.project_id == project_uuid)
.order_by(ElementTable.table_name.asc(), ElementTable.updated_at.desc())
.all()
)
appendix = [t for t in tables if "附表" in (t.table_name or "")]
if not appendix:
return ""
blocks: list[str] = ["## 附表"]
used = 0
for t in appendix:
md, common_unit = _build_structured_table_markdown(db, t.id, t.table_name)
title = str(t.table_name or "").strip() or f"附表({t.id}"
title = _merge_table_title_with_common_unit(title, common_unit)
if not md:
md = _build_appendix_table_fallback_markdown(title)
if not md:
continue
blocks.append(f"### {title}\n\n{md}")
used += 1
if used >= 30:
break
return "\n\n".join(blocks).strip() if used else ""
def _build_appendix_table_fallback_markdown(table_name: str) -> str:
"""
当 element_cells 暂无有效数据时,按固定模板输出占位附表,避免附表缺失。
当前优先支持附表8 可研报告和后评价参数对比表。
"""
name = str(table_name or "").replace(" ", "")
if ("附表8" in name) and ("可研报告和后评价参数对比表" in name):
return APPENDIX8_PARAMETER_COMPARISON_TABLE
return ""
def _resolve_appendix_figure_blobs_from_kb(db: Session, project_uuid: str) -> dict[int, tuple[bytes, str, str]]:
"""自知识库 docx 抽取附图嵌入图slot -> (blob, content_type, source_filename)。"""
doc_root = Path(settings.DOC_PAT).resolve()
rows = (
db.query(KbDocument)
.filter(KbDocument.project_id == project_uuid)
.order_by(KbDocument.uploaded_at.desc())
.all()
)
per_doc: list[tuple[str, dict[int, list[tuple[int, bytes, str]]]]] = []
for d in rows:
name = str(d.name or "")
if not name.lower().endswith(".docx"):
continue
full = _kb_doc_absolute_file_path_for_model(doc_root, d)
if not full.is_file():
continue
try:
cand = extract_appendix_figure_candidates_from_docx(full)
except Exception as exc:
logger.warning("appendix figure extraction failed %s: %s", full, exc)
continue
per_doc.append((name, cand))
return merge_best_appendix_figures(per_doc)
def _build_appendix_figures_markdown(db: Session, project_uuid: str) -> str:
"""
附图固定两项(细则):
- 附图1 全厂物料平衡图
- 附图2 烷基化装置物料平衡图常见为装置物料平衡图
仅从知识库 .docx 嵌入对象抽取真实图片;解析不到则不在报告中展示该项(不输出占位说明)。
"""
targets = APPENDIX_FIGURE_TARGETS
resolved = _resolve_appendix_figure_blobs_from_kb(db, project_uuid)
md_by_slot = appendix_figure_markdown_images(resolved, label_title=list(targets))
figure_parts: list[str] = []
for slot in range(1, len(targets) + 1):
md = md_by_slot.get(slot)
if md and str(md).strip():
figure_parts.append(str(md).strip())
if not figure_parts:
return ""
return "## 附图\n\n" + "\n\n".join(figure_parts)
def _update_chapter_status(
db: Session,
job: ReportGenerationJob,
chapter: ReportGenerationChapter,
status: str,
error_message: Optional[str],
) -> None:
now = datetime.now()
chapter.status = status
chapter.error_message = error_message
chapter.updated_at = now
job.current_section_key = chapter.section_key
job.updated_at = now
db.commit()
def _resolve_template(db: Session, template_id: Optional[str]) -> ReportTemplate:
template = None
if template_id:
template = db.query(ReportTemplate).filter(ReportTemplate.id == template_id).first()
if not template:
template = (
db.query(ReportTemplate)
.filter(ReportTemplate.is_default == True, ReportTemplate.is_active == True) # noqa: E712
.first()
)
if not template:
raise HTTPException(status_code=404, detail="未找到可用模板")
return template
def _list_template_sections(db: Session, template_id: str) -> list[ReportTemplateSection]:
return (
db.query(ReportTemplateSection)
.filter(ReportTemplateSection.template_id == template_id)
.order_by(ReportTemplateSection.section_order.asc())
.all()
)
def _sections_for_generation(sections: list[ReportTemplateSection]) -> list[ReportTemplateSection]:
items = list(sections or [])
if not items:
return []
section_nos = {
_extract_section_number(section.section_title or "")
for section in items
if _extract_section_number(section.section_title or "")
}
filtered: list[ReportTemplateSection] = []
for section in items:
section_no = _extract_section_number(section.section_title or "")
# 无法解析编号时保持兼容,继续参与生成。
if not section_no:
filtered.append(section)
continue
# 仅生成叶子节:若存在任一后续子节(前缀匹配 x.y.z ...),则当前节跳过。
has_children = any(no.startswith(f"{section_no}.") for no in section_nos)
if not has_children:
filtered.append(section)
return filtered
def _extract_tokens(text: str) -> list[str]:
src = str(text or "")
zh = re.findall(r"[\u4e00-\u9fa5]{2,8}", src)
en = re.findall(r"[A-Za-z]{3,20}", src.lower())
raw = zh + en
out: list[str] = []
seen = set()
for t in raw:
if t in seen:
continue
seen.add(t)
out.append(t)
return out
def _fmt_dt(dt: Optional[datetime]) -> Optional[str]:
if not dt:
return None
return dt.strftime("%Y-%m-%d %H:%M:%S")
def _select_chapter_example(section_title: str, raw_examples: Optional[str], evidence: dict) -> str:
# 1.2 对“叙事+证据锚点”敏感,宁夏石化类示例易带偏叙述重点,故不使用章节示例
if _extract_section_number(section_title or "") == "1.2":
return ""
# 2.1.1 须严格对齐《模版.doc》两张原料表章节示例中的其他项目表结构易干扰
if _extract_section_number(section_title or "") == "2.1.1":
return ""
# 5.1 表5-1 由合同与要素直出约束第5章样例为简化「指标|可研值|后评价值」表,易与要素表重复输出
if _extract_section_number(section_title or "") == "5.1":
return ""
# 5.2.1 表5-2/表5-3 由输出合同与要素直出约束第5章共用样例曾误标「表5-2 同类对标」易带偏表题
if _extract_section_number(section_title or "") == "5.2.1":
return ""
# 5.3.1 仅允许要素直出表5-4第5章样例含「指标|可研值|后评价值」简表易诱发重复表
if _extract_section_number(section_title or "") == "5.3.1":
return ""
# 5.4 仅允许表5-7第5章样例表5-1 可研/后评对比)易诱发重复简表
if _extract_section_number(section_title or "") == "5.4":
return ""
samples = _parse_examples(raw_examples)
if not samples:
return ""
title = str(section_title or "")
has_table_need = ("" in title) or _evidence_has_table_signal(evidence)
scored: list[tuple[int, str]] = []
for s in samples:
score = 0
txt = s.lower()
if has_table_need and ("|" in s or "" in s):
score += 4
if any(k in txt for k in ("万元", "亿元", "投资", "收益")) and ("投资" in title or "财务" in title):
score += 3
if any(k in txt for k in ("环保", "安全", "排放")) and ("影响" in title or "持续" in title):
score += 3
if any(k in txt for k in ("结论", "建议", "经验")) and ("结论" in title or "综合" in title):
score += 3
score += min(len(s) // 300, 2)
scored.append((score, s))
scored.sort(key=lambda x: x[0], reverse=True)
return scored[0][1]
def _parse_examples(raw_examples: Optional[str]) -> list[str]:
text = str(raw_examples or "").strip()
if not text:
return []
# 支持 JSON 数组格式:["示例1","示例2","示例3"]
if text.startswith("[") and text.endswith("]"):
try:
arr = json.loads(text)
if isinstance(arr, list):
out = [str(x).strip() for x in arr if str(x).strip()]
return out[:3]
except Exception:
pass
# 支持分隔符:---EXAMPLE--- 或 \n\n====\n\n
for sep in ("\n---EXAMPLE---\n", "\n====\n"):
if sep in text:
return [x.strip() for x in text.split(sep) if x.strip()][:3]
# 兼容“示例1/示例2/示例3”文本段
blocks = re.split(r"\n\s*示例\s*[1-3][:]\s*", "\n" + text)
blocks = [b.strip() for b in blocks if b.strip()]
if len(blocks) >= 2:
return blocks[:3]
return [text]
def _evidence_has_table_signal(evidence: dict) -> bool:
docs = evidence.get("chapterDocs") if isinstance(evidence, dict) else []
if not isinstance(docs, list):
return False
for d in docs[:8]:
if not isinstance(d, dict):
continue
content = str(d.get("content") or "")
if "|" in content or "" in content[:200]:
return True
return False
def _has_conflict_terms(report: str) -> bool:
pairs = [
("增加", "下降"),
("达标", "未达标"),
("盈利", "亏损"),
("改善", "恶化"),
]
for a, b in pairs:
if a in report and b in report:
return True
return False
def _resolve_project(db: Session, project_id: str) -> Optional[Project]:
if not project_id:
return None
p = db.query(Project).filter(Project.uuid == project_id).first()
if p:
return p
try:
pid = int(project_id)
except Exception:
return None
return db.query(Project).filter(Project.id == pid).first()