report_generation/services/template_prompt_mapper.py
xxy 43f3e0b746 Initial commit
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-05 18:41:06 +08:00

1240 lines
44 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import logging
import re
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any
from config import settings
from prompts.report_generation.template_prompt_rules import DEFAULT_SECTION_PROMPT
from services.template_service import (
_clean_section_title,
_core_title,
_extract_number_prefix,
_guideline_prompt_for,
_normalize_section_identity,
_section_key_to_number,
_title_match_score,
build_default_template_catalog,
default_section_output_contract,
)
logger = logging.getLogger(__name__)
_FUZZY_MATCH_THRESHOLD = 40
_LLM_MATCH_CONFIDENCE = 0.55
def resolve_uploaded_template_prompts(
uploaded_sections: list[dict[str, str]],
*,
use_llm: bool | None = None,
) -> list[dict[str, str]]:
"""
为上传模版各章节解析 sectionPrompt / sectionOutputContract
1. 目录与默认模版完全一致 → 按位套用默认提示词;
2. 仅编号序列一致、标题不同 → 按标题(结合编号)匹配默认章节并套用其提示词;
3. 否则本地模糊匹配相同语义标题;
4. 仍未匹配 → 调用大模型匹配或生成。
"""
if not uploaded_sections:
return []
catalog = build_default_template_catalog()
default_by_key = {row["sectionKey"]: row for row in catalog}
default_by_title = {_normalize_section_identity(row["sectionTitle"]): row for row in catalog}
uploaded_meta = [_section_meta(s, i) for i, s in enumerate(uploaded_sections)]
default_meta = [_catalog_meta(row, i) for i, row in enumerate(catalog)]
results: list[dict[str, str] | None] = [None] * len(uploaded_meta)
match_sources: list[str] = [""] * len(uploaded_meta)
matched_default_numbers: list[str | None] = [None] * len(uploaded_meta)
used_default_keys: set[str] = set()
# 1) 标题完全一致(顺序相同)→ 按索引套用
if _titles_equal_in_order(uploaded_meta, default_meta):
for i, dm in enumerate(default_meta):
if i >= len(uploaded_meta):
break
results[i] = _prompt_bundle(dm)
match_sources[i] = "exact_title_order"
return _finalize_results(
uploaded_meta, results, match_sources, matched_default_numbers
)
# 2) 编号序列一致、仅标题不同 → 按标题(同编号约束)匹配默认节,不按列表位置硬套
if _numbers_equal_in_order(uploaded_meta, default_meta):
for um in uploaded_meta:
matched = _match_default_by_title_and_number(
um,
default_meta,
used_default_keys=used_default_keys,
same_number_is_enough=True,
)
if matched:
bundle, src_num = _bundle_from_match_with_inline(
um, matched, uploaded_meta, default_meta
)
results[um["index"]] = bundle
match_sources[um["index"]] = "same_number_title_match"
matched_default_numbers[um["index"]] = src_num
_mark_default_used(um, matched, used_default_keys)
if all(r is not None for r in results):
return _finalize_results(
uploaded_meta, results, match_sources, matched_default_numbers
)
# 3) 本地模糊匹配(按标题 + 编号)
# for um in uploaded_meta:
# if results[um["index"]] is not None:
# continue
# matched = _local_match_default(
# um,
# default_meta,
# default_by_title,
# uploaded_meta,
# used_default_keys=used_default_keys,
# )
# if matched:
# bundle, src_num = _bundle_from_match_with_inline(
# um, matched, uploaded_meta, default_meta
# )
# results[um["index"]] = bundle
# match_sources[um["index"]] = "fuzzy_title"
# matched_default_numbers[um["index"]] = src_num
# _mark_default_used(um, matched, used_default_keys)
# 4) LLM 匹配 / 生成
unresolved = [um for um in uploaded_meta if results[um["index"]] is None]
llm_enabled = use_llm if use_llm is not None else bool(
getattr(settings, "TEMPLATE_UPLOAD_LLM_PROMPT_MAPPING", True)
)
if unresolved and llm_enabled and _llm_configured():
_apply_llm_mapping(
unresolved,
default_meta,
default_by_key,
results,
match_sources,
matched_default_numbers,
uploaded_meta,
)
# 5) 兜底:仅细则全文标题命中,否则按上传标题生成通用提示(避免按章号误套默认提示词)
for um in uploaded_meta:
if results[um["index"]] is not None:
continue
title = um["title"]
key = um["key"]
results[um["index"]] = {
"sectionPrompt": _fallback_prompt_for_unmatched(title, key),
"sectionOutputContract": _fallback_contract_for_unmatched(title, key),
}
match_sources[um["index"]] = "fallback_generic"
return _finalize_results(
uploaded_meta, results, match_sources, matched_default_numbers
)
def _finalize_results(
uploaded_meta: list[dict[str, Any]],
results: list[dict[str, str] | None],
match_sources: list[str],
matched_default_numbers: list[str | None] | None = None,
) -> list[dict[str, str]]:
defaults = matched_default_numbers or [None] * len(uploaded_meta)
out: list[dict[str, str]] = []
for um, src in zip(uploaded_meta, defaults):
idx = um["index"]
bundle = results[idx] or {
"sectionPrompt": DEFAULT_SECTION_PROMPT,
"sectionOutputContract": _fallback_contract_for_unmatched(um["title"], um["key"]),
}
prompt = bundle.get("sectionPrompt") or DEFAULT_SECTION_PROMPT
contract = bundle.get("sectionOutputContract") or ""
dst = um.get("number") or ""
if src and dst and src != dst:
leaf = _use_leaf_number_rewrite(contract, src, um, uploaded_meta)
prompt = _rewrite_numbers_and_tables(prompt, src, dst, leaf_slice=leaf)
contract = _rewrite_numbers_and_tables(contract, src, dst, leaf_slice=leaf)
prompt = _adapt_prompt_to_uploaded_structure(prompt, um, uploaded_meta)
contract = _adapt_prompt_to_uploaded_structure(contract, um, uploaded_meta)
out.append(
{
"sectionPrompt": prompt,
"sectionOutputContract": contract,
}
)
matched = sum(1 for s in match_sources if s and not s.startswith("fallback"))
logger.info(
"template_prompt_mapper: sections=%s matched=%s sources=%s",
len(uploaded_meta),
matched,
{s: match_sources.count(s) for s in set(match_sources) if s},
)
return out
def _section_meta(section: dict[str, str], index: int) -> dict[str, Any]:
title = str(section.get("sectionTitle") or "").strip()
key = str(section.get("sectionKey") or "").strip()
number = _extract_number_prefix(title) or _section_key_to_number(key)
return {
"index": index,
"key": key,
"title": title,
"number": number,
"norm_title": _normalize_section_identity(title),
"core_title": _core_title(_clean_section_title(title) or title),
}
def _catalog_meta(row: dict[str, str], index: int) -> dict[str, Any]:
title = row["sectionTitle"]
key = row["sectionKey"]
return {
"index": index,
"key": key,
"title": title,
"number": row.get("sectionNumber") or _extract_number_prefix(title) or _section_key_to_number(key),
"norm_title": _normalize_section_identity(title),
"core_title": _core_title(_clean_section_title(title) or title),
"sectionPrompt": row["sectionPrompt"],
"sectionOutputContract": row["sectionOutputContract"],
}
def _prompt_bundle(dm: dict[str, Any]) -> dict[str, str]:
return {
"sectionPrompt": str(dm.get("sectionPrompt") or ""),
"sectionOutputContract": str(dm.get("sectionOutputContract") or ""),
}
def _build_section_remap(src: str, dst: str) -> dict[str, str]:
"""单节编号替换(含子编号后缀,如 6.1.1 -> 4.1 则 6.1.1.1 -> 4.1.1)。"""
if not src or not dst or src == dst:
return {}
return {src: dst}
def _build_chapter_remap(src: str, dst: str) -> dict[str, str]:
"""章级编号替换,用于 表5-1 -> 表3-1 这类表号。"""
if not src or not dst:
return {}
src_ch = src.split(".", 1)[0]
dst_ch = dst.split(".", 1)[0]
if not src_ch.isdigit() or not dst_ch.isdigit() or src_ch == dst_ch:
return {}
return {src_ch: dst_ch}
_TABLE_NUM_RE = re.compile(r"表(\d+)-(\d+)")
def _rewrite_table_numbers_in_text(text: str, chapter_remap: dict[str, str]) -> str:
if not text or not chapter_remap:
return text
def _sub(match: re.Match[str]) -> str:
ch, seq = match.group(1), match.group(2)
new_ch = chapter_remap.get(ch)
if new_ch:
return f"{new_ch}-{seq}"
return match.group(0)
return _TABLE_NUM_RE.sub(_sub, text)
def _rewrite_numbers_and_tables(text: str, src: str, dst: str, *, leaf_slice: bool = False) -> str:
if not text or not src or not dst or src == dst:
return text
if leaf_slice:
text = _rewrite_leaf_subsection_numbers(text, src, dst)
else:
text = _rewrite_section_numbers_in_text(text, _build_section_remap(src, dst))
chapter_remap = _build_chapter_remap(src, dst)
return _rewrite_table_numbers_in_text(text, chapter_remap)
def _top_chapter_number(section_number: str | None) -> int | None:
m = re.match(r"^(\d+)", str(section_number or "").strip())
return int(m.group(1)) if m else None
def _section_number_tuple(section_number: str) -> tuple[int, ...]:
parts = []
for p in str(section_number or "").strip().split("."):
if p.isdigit():
parts.append(int(p))
else:
return tuple()
return tuple(parts)
def _direct_child_sections(
all_uploaded: list[dict[str, Any]], parent_number: str
) -> list[dict[str, Any]]:
parent = str(parent_number or "").strip()
if not parent:
return []
prefix = parent + "."
out: list[dict[str, Any]] = []
for um in all_uploaded:
num = str(um.get("number") or "").strip()
if not num.startswith(prefix) or num == parent:
continue
suffix = num[len(prefix) :]
if suffix and "." not in suffix:
out.append(um)
out.sort(key=lambda u: _section_number_tuple(str(u.get("number") or "")))
return out
def _preceding_chapters_label(
all_uploaded: list[dict[str, Any]], current_number: str | None
) -> tuple[str, int]:
"""返回第1N章, N用于替换默认合同里的「第16章」「前六章」。"""
cur_top = _top_chapter_number(current_number)
if cur_top is None:
return "前序章节", 0
tops = sorted(
{
t
for um in all_uploaded
if (t := _top_chapter_number(um.get("number"))) is not None
}
)
preced = [t for t in tops if t < cur_top]
if not preced:
return "前序章节", 0
if len(preced) >= 2 and preced[-1] - preced[0] + 1 == len(preced):
return f"{preced[0]}{preced[-1]}", len(preced)
return "".join(f"{t}" for t in preced), len(preced)
_CN_COUNT = ("", "", "", "", "", "", "", "", "", "", "")
def _cn_count(n: int) -> str:
if 0 < n < len(_CN_COUNT):
return _CN_COUNT[n]
return str(n)
_CHILDREN_COVER_RE = re.compile(r"(并按顺序完整覆盖下级小节[:])\s*[^。\n]+")
_PRECEDING_RANGE_RE = re.compile(r"\d+\d+章")
def _rewrite_preceding_chapter_refs(text: str, range_label: str, chapter_count: int) -> str:
if not text or not range_label:
return text
text = text.replace("【前序章节正文第16章", f"【前序章节正文({range_label})】")
text = _PRECEDING_RANGE_RE.sub(range_label, text)
if chapter_count > 0:
cn = _cn_count(chapter_count)
text = re.sub(r"前[一二三四五六七八九十]+章", f"{chapter_count}", text)
text = text.replace("前六章", f"{chapter_count}")
text = text.replace(f"{cn}", f"{chapter_count}")
text = text.replace("是对前六章内容的总结", f"是对{range_label}内容的总结")
text = text.replace("是对前6章内容的总结", f"是对{range_label}内容的总结")
return text
def _rewrite_children_cover_clause(text: str, child_numbers: list[str]) -> str:
if not text or not child_numbers:
return text
listing = "".join(child_numbers)
def _repl(m: re.Match[str]) -> str:
return f"{m.group(1)}{listing}"
return _CHILDREN_COVER_RE.sub(_repl, text, count=1)
def _rewrite_children_outline_block(
text: str, parent_number: str, children: list[dict[str, Any]]
) -> str:
"""将提示词里枚举的下级小节列表改为上传模版中的实际子节。"""
if not text or not parent_number or len(children) < 2:
return text
parent = re.escape(parent_number)
line_re = re.compile(rf"^(\s*)(\d+)\s*{parent}\.\d+\s+.+$")
lines = text.splitlines()
out: list[str] = []
i = 0
replaced = False
while i < len(lines):
if line_re.match(lines[i].strip()) or (
lines[i].strip() and re.match(rf"^\d+\s*{parent}\.\d+", lines[i].strip())
):
if not replaced:
for j, um in enumerate(children, 1):
num = str(um.get("number") or "").strip()
label = _clean_section_title(um.get("title") or "") or um.get(
"core_title", ""
)
out.append(f" {j}{num} {label}")
replaced = True
while i < len(lines) and (
line_re.match(lines[i].strip())
or re.match(rf"^\d+\s*{parent}\.\d+", lines[i].strip())
):
i += 1
continue
out.append(lines[i])
i += 1
return "\n".join(out)
def _contract_has_inline_child_list(contract: str, parent_num: str) -> bool:
"""默认合同把子条写在父节内(如 4.1.1、4.1.2 列表),而非独立章节。"""
if not contract or not parent_num:
return False
if "按顺序固定输出以下" not in contract and "小标题并分别展开" not in contract:
return False
return bool(re.search(rf"{re.escape(parent_num)}\.\d+", contract))
def _parse_inline_child_entries(contract: str, parent_num: str) -> list[dict[str, str]]:
entries: list[dict[str, str]] = []
for line in str(contract or "").splitlines():
stripped = line.strip()
m = re.match(rf"^{re.escape(parent_num)}\.(\d+)\s*(.+)$", stripped)
if m:
entries.append(
{
"suffix": m.group(1),
"default_num": f"{parent_num}.{m.group(1)}",
"label": m.group(2).strip(),
}
)
return entries
def _extract_inline_child_guidance(contract: str, child_default_num: str) -> str:
needle = f"{child_default_num}"
lines = str(contract or "").splitlines()
for i, line in enumerate(lines):
compact = line.replace(" ", "")
if needle not in compact:
continue
chunk = re.sub(r"^\d+\s*", "", line.strip()).strip()
for j in range(i + 1, len(lines)):
nxt = lines[j].strip()
if re.match(r"^\d+", nxt):
break
if nxt:
chunk += nxt
return chunk
return ""
def _uploaded_parent_number(
uploaded_num: str | None, all_uploaded: list[dict[str, Any]]
) -> str | None:
parts = str(uploaded_num or "").strip().split(".")
if len(parts) < 2:
return None
parent = ".".join(parts[:-1])
if any(str(o.get("number") or "").strip() == parent for o in all_uploaded):
return parent
return None
def _should_skip_whole_parent_match(
uploaded: dict[str, Any], dm: dict[str, Any], all_uploaded: list[dict[str, Any]]
) -> bool:
"""上传模版已拆出子节时,不要把整段父节合同套到叶节上。"""
u_num = str(uploaded.get("number") or "").strip()
d_num = str(dm.get("number") or "").strip()
if not u_num or not d_num or u_num.count(".") <= d_num.count("."):
return False
if not _uploaded_parent_number(u_num, all_uploaded):
return False
return _contract_has_inline_child_list(
str(dm.get("sectionOutputContract") or ""), d_num
)
def _try_inline_child_match(
uploaded: dict[str, Any],
default_meta: list[dict[str, Any]],
all_uploaded: list[dict[str, Any]],
) -> tuple[dict[str, Any], dict[str, str]] | None:
"""上传叶节对应默认父节合同中的某一条(如 4.1.1 投产组织)。"""
parent_num = _uploaded_parent_number(uploaded.get("number"), all_uploaded)
if not parent_num:
return None
u_num = str(uploaded.get("number") or "").strip()
child_suffix = u_num[len(parent_num) + 1 :]
if not child_suffix or "." in child_suffix or not child_suffix.isdigit():
return None
parent_um = next(
(o for o in all_uploaded if str(o.get("number") or "").strip() == parent_num),
None,
)
u_core = uploaded["core_title"]
# 1) 上传父节标题 + 子节序号对齐(如 2.3.1.1 ↔ 4.1.1
if parent_um:
for dm in default_meta:
pnum = str(dm.get("number") or "").strip()
contract = str(dm.get("sectionOutputContract") or "")
if not _contract_has_inline_child_list(contract, pnum):
continue
parent_score = _title_match_score(parent_um["core_title"], dm["core_title"])
if parent_score < 35:
continue
for entry in _parse_inline_child_entries(contract, pnum):
if entry["suffix"] != child_suffix:
continue
child_score = _title_match_score(u_core, _core_title(entry["label"]))
if child_score >= 12 or parent_score >= 50:
return dm, entry
# 2) 按子条标题模糊匹配(措辞略异时阈值放宽)
best_dm: dict[str, Any] | None = None
best_entry: dict[str, str] | None = None
best_score = -1
for dm in default_meta:
pnum = str(dm.get("number") or "").strip()
contract = str(dm.get("sectionOutputContract") or "")
if not _contract_has_inline_child_list(contract, pnum):
continue
for entry in _parse_inline_child_entries(contract, pnum):
score = _title_match_score(u_core, _core_title(entry["label"]))
if entry["suffix"] == child_suffix:
score += 15
if score > best_score:
best_score = score
best_dm = dm
best_entry = entry
if best_score < 20 or not best_dm or not best_entry:
return None
return best_dm, best_entry
def _build_inline_child_contract(
uploaded: dict[str, Any],
parent_dm: dict[str, Any],
entry: dict[str, str],
) -> str:
uploaded_num = str(uploaded.get("number") or "").strip()
label = _clean_section_title(uploaded.get("title") or "") or uploaded["core_title"]
guidance = _extract_inline_child_guidance(
str(parent_dm.get("sectionOutputContract") or ""),
entry["default_num"],
)
body = guidance or f"围绕「{label}」撰写本段内容,依据证据材料,缺失写「待补充」,禁止编造。"
return (
"必须严格按以下格式与顺序输出,不得缺项、不得改名:\n"
f'1首行固定输出标题"{uploaded_num} {label}"\n'
f"2{body}\n"
"【写作约束】\n"
"不得新增无关小标题;不得写入同级其他小条目的内容;证据不足处写「待补充」,禁止编造。"
)
def _adapt_prompt_to_uploaded_structure(
text: str,
section: dict[str, Any],
all_uploaded: list[dict[str, Any]],
) -> str:
if not text:
return text
num = str(section.get("number") or "").strip()
# 叶节合同不应再展开父节内嵌子条列表
if _is_leaf_subsection_contract(text):
return text
children = _direct_child_sections(all_uploaded, num)
if children and "按顺序固定输出以下" in text:
child_nums = [str(c.get("number") or "") for c in children]
text = _rewrite_children_cover_clause(text, child_nums)
text = _rewrite_children_outline_block(text, num, children)
range_label, count = _preceding_chapters_label(all_uploaded, num)
if count > 0 and _top_chapter_number(num) is not None:
text = _rewrite_preceding_chapter_refs(text, range_label, count)
return text
def _bundle_from_match_with_inline(
uploaded: dict[str, Any],
matched: dict[str, Any],
all_uploaded: list[dict[str, Any]],
default_meta: list[dict[str, Any]],
) -> tuple[dict[str, str], str]:
inline_hit = _try_inline_child_match(uploaded, default_meta, all_uploaded)
inline_entry = inline_hit[1] if inline_hit else None
parent_dm = inline_hit[0] if inline_hit else matched
return _bundle_from_default_match(
uploaded, parent_dm, all_uploaded, inline_entry=inline_entry
)
def _mark_default_used(
uploaded: dict[str, Any],
default: dict[str, Any],
used_default_keys: set[str],
) -> None:
"""同一默认父节可被多个上传子节切片复用,仅整节独占时标记已用。"""
u_core = uploaded.get("core_title") or ""
d_core = default.get("core_title") or ""
if u_core == d_core or _title_match_score(u_core, d_core) >= 58:
used_default_keys.add(default["key"])
def _bundle_from_default_match(
uploaded: dict[str, Any],
default: dict[str, Any],
all_uploaded: list[dict[str, Any]],
*,
inline_entry: dict[str, str] | None = None,
) -> tuple[dict[str, str], str]:
"""按标题从默认节取提示词;子节从父节合同中切片,父节去掉已单独成节的内容。"""
src_num = str(default.get("number") or "")
if inline_entry:
contract = _build_inline_child_contract(uploaded, default, inline_entry)
child_src = inline_entry["default_num"]
return (
{
"sectionPrompt": contract,
"sectionOutputContract": contract,
},
child_src,
)
prompt = str(default.get("sectionPrompt") or "")
contract = str(default.get("sectionOutputContract") or "")
u_core = uploaded["core_title"]
d_core = default["core_title"]
title_score = _title_match_score(u_core, d_core) if u_core and d_core else 0
subsection = _extract_subsection_from_contract(contract, u_core)
if subsection and u_core != d_core and title_score < 58:
label = _clean_section_title(uploaded["title"]) or u_core
num = uploaded.get("number") or ""
heading = f"{num} {label}".strip() if num else label
body = _strip_redundant_subsection_heading(subsection, u_core)
contract = (
"必须严格按以下格式与顺序输出,不得缺项、不得改名:\n"
f'1首行固定输出标题"{heading}"\n'
f"{body}"
)
if not prompt.strip() or len(prompt) < 80:
prompt = contract
if _has_uploaded_children(uploaded, all_uploaded):
contract = _trim_parent_contract_for_children(contract, src_num)
if prompt == str(default.get("sectionPrompt") or ""):
prompt = contract
return (
{
"sectionPrompt": prompt or DEFAULT_SECTION_PROMPT,
"sectionOutputContract": contract,
},
src_num,
)
def _has_uploaded_children(section: dict[str, Any], all_uploaded: list[dict[str, Any]]) -> bool:
prefix = str(section.get("number") or "").strip()
if not prefix:
return False
child_prefix = prefix + "."
for other in all_uploaded:
num = str(other.get("number") or "")
if num.startswith(child_prefix) and num != prefix:
return True
return False
def _extract_subsection_from_contract(contract: str, core_title: str) -> str | None:
if not contract or not core_title:
return None
core = str(core_title).strip()
if core not in contract:
return None
blocks = re.split(r"(?=\d+)固定输出小节标题)", contract)
matched: list[str] = []
for block in blocks:
if core in block and "固定输出小节标题" in block:
matched.append(block.strip())
if not matched:
return None
if len(matched) == 1:
return matched[0]
# 多个同名子节(如「效果及影响」)取与环境/监测更相关的一块
for block in matched:
if any(k in block for k in ("废气", "废水", "噪声监测", "环保措施")):
return block
return matched[0]
def _strip_redundant_subsection_heading(subsection: str, core_title: str) -> str:
"""
去掉切片里与节标题重复的「固定输出小节标题」行,正文从 2起编号。
3固定输出小节标题"x.x.x 环保措施",并在该小节下… → 2并在该小节下…
"""
if not subsection:
return ""
core = str(core_title).strip()
out_lines: list[str] = []
for line in subsection.splitlines():
stripped = line.strip()
if not stripped:
if out_lines:
out_lines.append(line)
continue
if "固定输出小节标题" in stripped and (not core or core in stripped):
m = re.search(
r'固定输出小节标题\s*[:]\s*["\u201c][^"\u201d]+["\u201d]\s*[,]?\s*(.*)$',
stripped,
)
tail = (m.group(1) if m else "").strip()
if tail:
out_lines.append(f"2{tail}")
continue
if re.match(r"^\d+)固定输出小节标题", stripped):
continue
out_lines.append(line)
body = "\n".join(out_lines).strip()
body = _trim_parent_tail_from_subsection(body)
if body and not re.match(r"^\d+", body):
body = f"2{body}"
return body
def _trim_parent_tail_from_subsection(body: str) -> str:
"""去掉误带入的父节收尾条款(如整节「后评价认为」结论)。"""
if not body:
return ""
kept: list[str] = []
for line in body.splitlines():
stripped = line.strip()
if re.match(r"^5末尾必须以", stripped):
break
if stripped.startswith("【写作约束】"):
break
kept.append(line)
return "\n".join(kept).strip()
def _is_leaf_subsection_contract(contract: str) -> bool:
"""叶节合同:已有首行标题,且不再以「固定输出小节标题」开头。"""
text = (contract or "").lstrip()
if not text.startswith("必须严格"):
return False
if "按顺序固定输出以下" in text and "小标题并分别展开" in text:
return False
return "首行固定输出标题" in text[:200] and not re.search(
r"^2固定输出小节标题", text, re.MULTILINE
)
def _use_leaf_number_rewrite(
contract: str,
src: str,
uploaded: dict[str, Any],
all_uploaded: list[dict[str, Any]],
) -> bool:
"""仅对真正叶节切片使用「整段替换为叶节编号」;含内嵌子条列表的父节不用。"""
if _contract_has_inline_child_list(contract, src):
return False
if _has_uploaded_children(uploaded, all_uploaded):
return False
return _is_leaf_subsection_contract(contract)
def _rewrite_leaf_subsection_numbers(text: str, src_root: str, dst_leaf: str) -> str:
"""子节切片将默认父节下所有编号6.1.1.x统一替换为上传叶节编号4.1.1)。"""
if not text or not src_root or not dst_leaf:
return text
def _sub(match: re.Match[str]) -> str:
num = match.group(1)
if num == src_root or num.startswith(src_root + "."):
return dst_leaf
return match.group(0)
return _SECTION_NUM_IN_TEXT_RE.sub(_sub, text)
def _trim_parent_contract_for_children(
contract: str, parent_num: str | None = None
) -> str:
m = re.search(r"\d+)固定输出小节标题", contract)
if m:
trimmed = contract[: m.start()].rstrip()
return trimmed if trimmed else contract
if parent_num and _contract_has_inline_child_list(contract, parent_num):
return _trim_inline_parent_contract_for_children(contract, parent_num)
return contract
def _trim_inline_parent_contract_for_children(contract: str, parent_num: str) -> str:
"""父节内嵌子条已单独成节时,去掉各子条撰写细则,保留总述与总结。"""
lines = str(contract or "").splitlines()
kept: list[str] = []
for line in lines:
stripped = line.strip()
if re.match(r"^[3-7]", stripped) and f"{parent_num}." in stripped.replace(" ", ""):
continue
if re.match(r"^[3-7]", stripped) and f"{parent_num}" in stripped.replace(" ", ""):
continue
kept.append(line)
return "\n".join(kept).strip() or contract
def _remap_single_number(num: str, remap: dict[str, str]) -> str:
if not num or not remap:
return num
if num in remap:
return remap[num]
parts = num.split(".")
for end in range(len(parts) - 1, 0, -1):
prefix = ".".join(parts[:end])
if prefix in remap:
return remap[prefix] + num[len(prefix) :]
return num
_SECTION_NUM_IN_TEXT_RE = re.compile(r"(?<![\d.])(\d+(?:\.\d+)*)(?![\d.])")
def _rewrite_section_numbers_in_text(text: str, remap: dict[str, str]) -> str:
if not text or not remap:
return text
def _sub(match: re.Match[str]) -> str:
num = match.group(1)
new_num = _remap_single_number(num, remap)
return new_num if new_num != num else match.group(0)
return _SECTION_NUM_IN_TEXT_RE.sub(_sub, text)
def _titles_equal_in_order(
uploaded: list[dict[str, Any]],
default: list[dict[str, Any]],
) -> bool:
if len(uploaded) != len(default):
return False
for u, d in zip(uploaded, default):
if u["norm_title"] != d["norm_title"]:
return False
return True
def _numbers_equal_in_order(
uploaded: list[dict[str, Any]],
default: list[dict[str, Any]],
) -> bool:
if len(uploaded) != len(default):
return False
for u, d in zip(uploaded, default):
if (u["number"] or "") != (d["number"] or ""):
return False
return True
def _match_default_by_title_and_number(
uploaded: dict[str, Any],
default_meta: list[dict[str, Any]],
*,
used_default_keys: set[str] | None = None,
same_number_is_enough: bool = False,
) -> dict[str, Any] | None:
"""
按上传标题中的章节编号定位默认目录中的对应节,再按标题语义择优。
same_number_is_enough编号序列已与默认一致时同编号唯一默认节直接套用标题仅措辞不同
"""
used = used_default_keys or set()
u_num = uploaded["number"]
u_core = uploaded["core_title"]
candidates = [
dm
for dm in default_meta
if dm["key"] not in used and (not u_num or dm["number"] == u_num)
]
if not candidates:
return None
if len(candidates) == 1 and u_num and candidates[0]["number"] == u_num:
d_core = candidates[0]["core_title"]
if same_number_is_enough:
if _titles_topic_compatible(u_core, d_core):
return candidates[0]
return None
title_score = _title_match_score(u_core, d_core)
if title_score >= 20:
return candidates[0]
return None
best: dict[str, Any] | None = None
best_score = -1
for dm in candidates:
title_score = _title_match_score(u_core, dm["core_title"])
if title_score < _FUZZY_MATCH_THRESHOLD:
continue
score = title_score + (20 if u_num and dm["number"] == u_num else 0)
if score > best_score:
best_score = score
best = dm
return best
def _local_match_default(
uploaded: dict[str, Any],
default_meta: list[dict[str, Any]],
default_by_title: dict[str, dict[str, str]],
all_uploaded: list[dict[str, Any]],
*,
used_default_keys: set[str] | None = None,
) -> dict[str, Any] | None:
if uploaded["norm_title"] in default_by_title:
row = default_by_title[uploaded["norm_title"]]
dm = _catalog_meta(row, -1)
if not used_default_keys or dm["key"] not in used_default_keys:
return dm
inline_hit = _try_inline_child_match(uploaded, default_meta, all_uploaded)
if inline_hit:
parent_dm, _entry = inline_hit
return parent_dm
subsection_parent = _find_subsection_parent(uploaded, default_meta, used_default_keys)
if subsection_parent:
return subsection_parent
return _match_default_by_title_semantic(
uploaded, default_meta, all_uploaded, used_default_keys=used_default_keys
)
def _find_subsection_parent(
uploaded: dict[str, Any],
default_meta: list[dict[str, Any]],
used_default_keys: set[str] | None,
) -> dict[str, Any] | None:
"""上传节为子标题(如 环保措施),在默认父节合同中找到对应切片时匹配父节(可复用同一父节)。"""
u_core = uploaded["core_title"]
if not u_core:
return None
best: dict[str, Any] | None = None
best_score = -1
for dm in default_meta:
contract = str(dm.get("sectionOutputContract") or "")
subsection = _extract_subsection_from_contract(contract, u_core)
if not subsection:
continue
if _title_match_score(u_core, dm["core_title"]) >= 58:
continue
score = _title_match_score(u_core, dm["core_title"])
if "环境" in u_core or "环保" in u_core:
if "环境" in dm["core_title"] or "环保" in dm["core_title"]:
score += 30
if "安全" in dm["core_title"]:
score -= 25
if "监测" in subsection or "废气" in subsection:
if "环境" in dm["core_title"]:
score += 15
if score > best_score:
best_score = score
best = dm
return best
def _match_default_by_title_semantic(
uploaded: dict[str, Any],
default_meta: list[dict[str, Any]],
all_uploaded: list[dict[str, Any]],
*,
used_default_keys: set[str] | None = None,
) -> dict[str, Any] | None:
used = used_default_keys or set()
u_core = uploaded["core_title"]
if not u_core:
return None
u_num = str(uploaded.get("number") or "")
min_score = 55 if u_num and "." not in u_num else _FUZZY_MATCH_THRESHOLD
best: dict[str, Any] | None = None
best_score = -1
for dm in default_meta:
if dm["key"] in used:
continue
d_core = dm["core_title"]
title_score = _title_match_score(u_core, d_core)
if u_core == d_core:
return dm
if _should_skip_whole_parent_match(uploaded, dm, all_uploaded):
continue
if title_score < 45 and not _titles_topic_compatible(u_core, d_core):
continue
if title_score < 45:
continue
if title_score > best_score:
best_score = title_score
best = dm
return best if best_score >= min_score else None
_GENERIC_TITLE_FRAGS = frozenset(
{
"评价",
"分析",
"结论",
"建议",
"概况",
"情况",
"说明",
"管理",
"工作",
}
)
def _titles_topic_compatible(uploaded_core: str, default_core: str) -> bool:
"""判断两节标题是否同一主题(措辞略异为真,换题为假)。"""
if not uploaded_core or not default_core:
return False
if _title_match_score(uploaded_core, default_core) >= 12:
return True
tks_u = set(re.findall(r"[\u4e00-\u9fa5]{2,8}", uploaded_core)) - _GENERIC_TITLE_FRAGS
tks_d = set(re.findall(r"[\u4e00-\u9fa5]{2,8}", default_core)) - _GENERIC_TITLE_FRAGS
if tks_u & tks_d:
return True
for n in (4, 3, 2):
for i in range(len(uploaded_core) - n + 1):
frag = uploaded_core[i : i + n]
if frag in _GENERIC_TITLE_FRAGS:
continue
if frag in default_core:
return True
return False
def _fallback_prompt_for_unmatched(title: str, section_key: str | None) -> str:
guideline = _guideline_prompt_for(title, section_key)
if guideline:
return guideline
return _fallback_contract_for_unmatched(title, section_key)
def _fallback_contract_for_unmatched(title: str, section_key: str | None) -> str:
label = _clean_section_title(title) or str(title or "").strip() or "本章节"
num = _extract_number_prefix(title) or _section_key_to_number(section_key)
heading = f"{num} {label}".strip() if num else label
return (
f"必须严格按以下要求输出:\n"
f'1首行固定输出标题"{heading}"\n'
f"2正文围绕「{label}」撰写,结构须与本节标题一致,先事实后评价。\n"
f"3依据证据材料缺失写「待补充」禁止编造。"
)
def _llm_configured() -> bool:
return bool(
(settings.LLM_API_BASE or "").strip()
and (settings.LLM_API_KEY or "").strip()
and (settings.LLM_MODEL_NAME or "").strip()
)
_LLM_MAPPING_SYSTEM_PROMPT = (
"你是炼油化工建设项目后评价报告模版专家。"
"任务:判断上传模版章节能否复用系统默认章节的撰写提示词,并为无法复用的章节生成简短提示词。"
"只输出 JSON object不要解释。"
)
def _build_llm_mapping_user_prompt(
default_meta: list[dict[str, Any]],
default_lines: list[str],
batch: list[dict[str, Any]],
) -> str:
upload_lines = [
f'- index={um["index"]} number={um["number"]} title={um["title"]} core={um["core_title"]}'
for um in batch
]
return f"""系统默认模版共 {len(default_meta)} 节(节选提示词预览):
{chr(10).join(default_lines[:120])}
待处理的上传章节index 为上传列表下标):
{chr(10).join(upload_lines)}
请返回 JSON
{{
"structure_compatible": true/false,
"matches": [
{{"upload_index": 0, "default_key": "3-1", "confidence": 0.0-1.0}}
],
"generated": [
{{
"upload_index": 5,
"section_prompt": "200字以内的章节撰写要求面向后评价报告缺失写待补充禁止编造",
"section_output_contract": "可选100字以内的输出结构约束不需要可空字符串"
}}
]
}}
规则:
1. structure_compatible上传模版与默认模版目录层级、编号体系一致且仅标题措辞略异时为 true。
2. matches语义与默认某节相同或高度相近时填写 default_key必须来自默认列表的 keyconfidence>=0.55 才有效。
3. generated无法对应默认章节时根据上传标题写 section_promptcontract 可简述需含表格/小节等。
4. 同一 upload_index 只出现在 matches 或 generated 之一;不要重复。
5. 禁止编造与标题无关的细则内容。"""
def _apply_llm_mapping(
unresolved: list[dict[str, Any]],
default_meta: list[dict[str, Any]],
default_by_key: dict[str, dict[str, str]],
results: list[dict[str, str] | None],
match_sources: list[str],
matched_default_numbers: list[str | None],
all_uploaded_meta: list[dict[str, Any]],
) -> None:
"""把未匹配章节分批并行调用 LLM再统一合并结果。
单次大请求的耗时随待生成条目数线性增长;分批后每个请求输出更小、可并行,
显著缩短整体等待时间LLM 调用为网络 I/O多线程下真正并行
"""
try:
from services.llm_client import chat_completions_json
except Exception as e:
logger.warning("template_prompt_mapper: llm import failed: %s", e)
return
default_lines = []
for dm in default_meta:
prompt_preview = re.sub(r"\s+", " ", str(dm.get("sectionPrompt") or ""))[:240]
default_lines.append(
f'- key={dm["key"]} number={dm["number"]} title={dm["title"]} '
f'prompt_preview="{prompt_preview}"'
)
batch_size = max(int(getattr(settings, "TEMPLATE_UPLOAD_LLM_BATCH_SIZE", 8) or 8), 1)
max_workers = max(int(getattr(settings, "TEMPLATE_UPLOAD_LLM_MAX_WORKERS", 4) or 4), 1)
max_tokens = int(getattr(settings, "TEMPLATE_UPLOAD_LLM_MAX_TOKENS", 4096) or 4096)
timeout_sec = int(getattr(settings, "LLM_HTTP_TIMEOUT_SEC", 120) or 120)
batches = [unresolved[i : i + batch_size] for i in range(0, len(unresolved), batch_size)]
def _run_batch(batch: list[dict[str, Any]]) -> dict:
user_prompt = _build_llm_mapping_user_prompt(default_meta, default_lines, batch)
try:
return chat_completions_json(
system_prompt=_LLM_MAPPING_SYSTEM_PROMPT,
user_prompt=user_prompt,
temperature=0.1,
max_tokens=max_tokens,
timeout_sec=timeout_sec,
)
except Exception as e: # noqa: BLE001
logger.warning("template_prompt_mapper: llm batch call failed: %s", e)
return {}
collected: list[dict] = []
if len(batches) <= 1:
collected = [_run_batch(b) for b in batches]
else:
workers = min(max_workers, len(batches))
with ThreadPoolExecutor(max_workers=workers) as executor:
futures = [executor.submit(_run_batch, b) for b in batches]
for fut in as_completed(futures):
collected.append(fut.result())
logger.info(
"template_prompt_mapper: llm 并行匹配 | 待处理=%s | 批数=%s | 线程=%s",
len(unresolved), len(batches), workers,
)
for data in collected:
if isinstance(data, dict):
_merge_llm_mapping_response(
data,
unresolved,
default_by_key,
results,
match_sources,
matched_default_numbers,
all_uploaded_meta,
default_meta,
)
def _merge_llm_mapping_response(
data: dict,
unresolved: list[dict[str, Any]],
default_by_key: dict[str, dict[str, str]],
results: list[dict[str, str] | None],
match_sources: list[str],
matched_default_numbers: list[str | None],
all_uploaded_meta: list[dict[str, Any]],
default_meta: list[dict[str, Any]],
) -> None:
for item in data.get("matches") or []:
if not isinstance(item, dict):
continue
try:
idx = int(item.get("upload_index"))
except (TypeError, ValueError):
continue
if idx < 0 or idx >= len(results) or results[idx] is not None:
continue
try:
conf = float(item.get("confidence") or 0)
except (TypeError, ValueError):
conf = 0.0
if conf < _LLM_MATCH_CONFIDENCE:
continue
default_key = str(item.get("default_key") or "").strip()
row = default_by_key.get(default_key)
if not row:
continue
dm = _catalog_meta(row, -1)
um = next((u for u in unresolved if u["index"] == idx), None)
if um:
bundle, src_num = _bundle_from_match_with_inline(
um, dm, all_uploaded_meta, default_meta
)
results[idx] = bundle
matched_default_numbers[idx] = src_num
else:
results[idx] = _prompt_bundle(dm)
matched_default_numbers[idx] = dm.get("number") or ""
match_sources[idx] = "llm_match"
for item in data.get("generated") or []:
if not isinstance(item, dict):
continue
try:
idx = int(item.get("upload_index"))
except (TypeError, ValueError):
continue
if idx < 0 or idx >= len(results) or results[idx] is not None:
continue
prompt = str(item.get("section_prompt") or "").strip()
contract = str(item.get("section_output_contract") or "").strip()
if not prompt:
continue
um = next((u for u in unresolved if u["index"] == idx), None)
title = um["title"] if um else ""
key = um["key"] if um else ""
results[idx] = {
"sectionPrompt": prompt,
"sectionOutputContract": contract or default_section_output_contract(title, key),
}
match_sources[idx] = "llm_generated"