""" services/reference_service.py 参考范文加载服务:报告生成时按需加载对应章节参考范文 """ from __future__ import annotations import json import logging import re from typing import Optional from sqlalchemy.orm import Session from database.models import ReportSectionReference from services.llm_client import chat_completions_json logger = logging.getLogger(__name__) _DESENSITIZE_SYSTEM_PROMPT = """你是一个文档脱敏助手。你的任务是对后评价报告范文进行脱敏处理,只保留报告的结构骨架。 ## 脱敏规则 ### 必须保留的结构 1. Markdown 标题层级(## 1.1、## 1.2、### 1.2.1 等) 2. 表格的表头行、分隔行(|--|--|) 3. 段落/章节的组织顺序和逻辑关系 4. 文字的叙述逻辑(先写什么、再写什么) 5. 表格的行数、列数、表头字段名(如"序号""项目名称""可研报告""实际值") ### 必须替换为 xxx 的内容 1. 所有具体数字:金额、年份、百分比、数量、面积、产能、投资额等 2. 项目名称、公司名称、单位名称等专有名词(书名号/引号内的内容) 3. 表格中的数据单元格内容(保留表头) 4. 具体的日期、时间节点 5. 财务指标的具体数值(IRR、NPV、回收期等) ### 特别注意 - 不要随意增删段落或改变段落顺序 - 不要删除整个表格,只替换表格中的数据单元格 - 保持原 Markdown 格式不变 - "待补充"、"详见附表"等 固定用语 不脱敏 - 书名号《》中的内容如果是不知名的规范/标准名称(如《石油化工标准》),保留书名号但内容替换为 xxx""" _DESENSITIZE_USER_PROMPT_TEMPLATE = """请对以下后评价报告章节进行脱敏处理,只保留结构骨架,所有具体数据替换为 xxx: ``` {content} ``` 请严格按照脱敏规则处理,直接输出脱敏后的完整 Markdown 内容,不要输出任何额外说明。""" def _desensitize_via_llm(content: str) -> str: """ 调用大模型对参考范文进行脱敏处理。 传入完整内容,返回仅保留结构骨架、具体数据替换为 xxx 的 Markdown。 若 LLM 调用失败,退回原始内容(不脱敏优于拒绝服务)。 """ if not content or not content.strip(): return content user_prompt = _DESENSITIZE_USER_PROMPT_TEMPLATE.format(content=content[:12000]) logger.info("参考范文脱敏 start | content_len=%s", len(content)) try: result = chat_completions_json( system_prompt=_DESENSITIZE_SYSTEM_PROMPT, user_prompt=user_prompt, temperature=0.0, max_tokens=16384, timeout_sec=120, ) raw = result.get("content") or "" if isinstance(raw, str) and raw.strip(): # 去掉可能的 ```markdown / ``` 包裹 cleaned = re.sub(r"^```(?:markdown)?\s*", "", raw.strip(), flags=re.IGNORECASE) cleaned = re.sub(r"\s*```$", "", cleaned) logger.info("参考范文脱敏 done | original_len=%s | desensitized_len=%s", len(content), len(cleaned)) return cleaned.strip() except Exception as e: logger.warning("LLM 脱敏失败,退回原文: %s", e) return content def load_section_reference( db: Session, section_key: str, source_file: Optional[str] = None, *, max_chars: int = 8000, ) -> str: """ 加载指定章节的参考范文内容。 Args: db: 数据库会话 section_key: 章节标识(如 "1.1", "2.1.1") source_file: 来源文件名(可选,不指定时取该章节最新的一条) max_chars: 最大字符数,超出截断 Returns: 参考范文 Markdown 文本,未找到时返回空字符串 """ query = db.query(ReportSectionReference).filter( ReportSectionReference.section_key == section_key ) if source_file: query = query.filter(ReportSectionReference.source_file == source_file) ref = ( query .order_by(ReportSectionReference.updated_at.desc()) .first() ) if not ref or not ref.content: return "" content = ref.content.strip() if not content: return "" content = _desensitize_via_llm(content) if len(content) > max_chars: logger.info("参考范文 %s 超出 %d 字符限制,已截断", section_key, max_chars) content = content[:max_chars] + "\n\n(参考范文超出长度限制,已截断)" return content def load_section_reference_by_title( db: Session, section_title: str, source_file: Optional[str] = None, *, max_chars: int = 8000, ) -> str: """ 按标题关键字匹配加载参考范文(不精确匹配 section_key 时的兜底方案)。 """ refs = db.query(ReportSectionReference) if source_file: refs = refs.filter(ReportSectionReference.source_file == source_file) # 尝试精确匹配 section_key(从标题中提取编号) import re m = re.match(r"(\d+(?:\.\d+)*)", section_title.strip()) if m: key = m.group(1) exact = ( refs.filter(ReportSectionReference.section_key == key) .order_by(ReportSectionReference.updated_at.desc()) .first() ) if exact and exact.content: content = exact.content.strip() content = _desensitize_via_llm(content) if len(content) > max_chars: content = content[:max_chars] + "\n\n(参考范文超出长度限制,已截断)" return content # 按标题模糊匹配 ref = ( refs.filter(ReportSectionReference.section_title.contains(section_title[:20])) .order_by(ReportSectionReference.updated_at.desc()) .first() ) if not ref or not ref.content: return "" content = ref.content.strip() if not content: return "" content = _desensitize_via_llm(content) if len(content) > max_chars: content = content[:max_chars] + "\n\n(参考范文超出长度限制,已截断)" return content def load_section_reference_raw( db: Session, section_key: str, template_id: Optional[str] = None, *, max_chars: int = 8000, ) -> str: """ 加载指定章节存储在数据库中的原始参考范文内容(不做 LLM 脱敏)。 与 load_section_reference 的区别:直接返回 report_section_references.content 原文, 仅保留长度截断保护,不再调用 _desensitize_via_llm。 template_id: 选中模板的 ID。传入后只注入与该模板关联的参考范文,实现“按模板过滤”; 为空则不做模板过滤(取最新一条)。 """ query = db.query(ReportSectionReference).filter( ReportSectionReference.section_key == section_key ) if template_id: query = query.filter(ReportSectionReference.template_id == template_id) ref = ( query .order_by(ReportSectionReference.updated_at.desc()) .first() ) if not ref or not ref.content: return "" content = ref.content.strip() if not content: return "" if len(content) > max_chars: logger.info("参考范文 %s 超出 %d 字符限制,已截断", section_key, max_chars) content = content[:max_chars] + "\n\n(参考范文超出长度限制,已截断)" return content def load_section_reference_raw_by_title( db: Session, section_title: str, template_id: Optional[str] = None, *, max_chars: int = 8000, ) -> str: """按标题匹配加载原始参考范文内容(不做 LLM 脱敏),用于 section_key 未命中时的兜底。""" refs = db.query(ReportSectionReference) if template_id: refs = refs.filter(ReportSectionReference.template_id == template_id) import re m = re.match(r"(\d+(?:\.\d+)*)", section_title.strip()) if m: key = m.group(1) exact = ( refs.filter(ReportSectionReference.section_key == key) .order_by(ReportSectionReference.updated_at.desc()) .first() ) if exact and exact.content: content = exact.content.strip() if len(content) > max_chars: content = content[:max_chars] + "\n\n(参考范文超出长度限制,已截断)" return content ref = ( refs.filter(ReportSectionReference.section_title.contains(section_title[:20])) .order_by(ReportSectionReference.updated_at.desc()) .first() ) if not ref or not ref.content: return "" content = ref.content.strip() if not content: return "" if len(content) > max_chars: content = content[:max_chars] + "\n\n(参考范文超出长度限制,已截断)" return content def list_available_source_files(db: Session) -> list[str]: """列出所有已上传的参考范文来源文件列表。""" results = ( db.query(ReportSectionReference.source_file) .distinct() .order_by(ReportSectionReference.source_file) .all() ) return [r[0] for r in results if r[0]]