""" 从项目知识库 Word(.docx)中提取「附图1/附图2」嵌入图,用于报告附录。 细则常见版式:附图标题段落与图在同一节或相邻段落;解析时合并前/当前/后段文字做关键词匹配。 """ from __future__ import annotations import base64 import logging from pathlib import Path from typing import Optional from docx import Document from docx.oxml.ns import qn from docx.table import Table from docx.text.paragraph import Paragraph logger = logging.getLogger(__name__) # 过滤装饰性小图(logo 等) _MIN_FIGURE_BYTES = 6000 R_EMBED = "{http://schemas.openxmlformats.org/officeDocument/2006/relationships}embed" _NS = { "w": "http://schemas.openxmlformats.org/wordprocessingml/2006/main", "a": "http://schemas.openxmlformats.org/drawingml/2006/main", } def _compact(s: str) -> str: return "".join(str(s or "").split()) def _classify_slot(ctx: str) -> Optional[int]: """ 返回 1=全厂物料平衡图,2=装置(如烷基化)物料平衡图。 """ t = _compact(ctx) if not t: return None # 附图编号(先判 2,避免同段目录同时出现两个编号时误判) if "附图2" in t: return 2 if "附图1" in t: return 1 if "全厂" in t and "物料平衡" in t: return 1 if "烷基化" in t and "物料平衡" in t: return 2 if "装置" in t and "物料平衡" in t and "全厂" not in t: return 2 return None def _content_type_to_md_subtype(content_type: str) -> str: ct = (content_type or "").lower() if "jpeg" in ct or ct.endswith("jpg"): return "jpeg" if "png" in ct: return "png" if "gif" in ct: return "gif" if "emf" in ct: return "x-emf" if "wmf" in ct: return "x-wmf" return "png" def _blob_to_data_uri(blob: bytes, content_type: str) -> str: sub = _content_type_to_md_subtype(content_type) b64 = base64.standard_b64encode(blob).decode("ascii") return f"data:image/{sub};base64,{b64}" def _iter_paragraphs_deep(doc: Document): body_el = doc.element.body for el in body_el: if el.tag == qn("w:p"): yield Paragraph(el, doc._body) elif el.tag == qn("w:tbl"): table = Table(el, doc._body) for row in table.rows: for cell in row.cells: for p in cell.paragraphs: yield p def extract_appendix_figure_candidates_from_docx(path: Path) -> dict[int, list[tuple[int, bytes, str]]]: """ 从单个 docx 收集候选图:slot -> [(size, blob, content_type), ...] content_type 来自 OPC part,用于拼 data URI。 """ candidates: dict[int, list[tuple[int, bytes, str]]] = {1: [], 2: []} orphans_ordered: list[tuple[bytes, str]] = [] try: doc = Document(str(path)) except Exception as exc: logger.warning("appendix figure: open docx failed %s: %s", path, exc) return candidates paras = list(_iter_paragraphs_deep(doc)) texts = [p.text or "" for p in paras] for i, p in enumerate(paras): blobs_with_type: list[tuple[bytes, str]] = [] for blip in p._element.findall(".//a:blip", _NS): embed = blip.get(R_EMBED) if not embed: continue try: rel = p.part.related_parts[embed] except KeyError: continue blob = getattr(rel, "blob", None) ct = getattr(rel, "content_type", "") or "image/png" if blob and len(blob) >= _MIN_FIGURE_BYTES: blobs_with_type.append((blob, ct)) if not blobs_with_type: continue prev_t = texts[i - 1] if i > 0 else "" cur_t = texts[i] next_t = texts[i + 1] if i + 1 < len(texts) else "" ctx = f"{prev_t}\n{cur_t}\n{next_t}" slot = _classify_slot(ctx) if slot is None: for blob, ct in blobs_with_type: orphans_ordered.append((blob, ct)) continue for blob, ct in blobs_with_type: candidates[slot].append((len(blob), blob, ct)) def _dedupe_preserve_order(pairs: list[tuple[bytes, str]]) -> list[tuple[bytes, str]]: seen: set[int] = set() out: list[tuple[bytes, str]] = [] for blob, ct in pairs: bid = id(blob) if bid in seen: continue seen.add(bid) out.append((blob, ct)) return out orphans_ordered = _dedupe_preserve_order(orphans_ordered) used_ids: set[int] = set() for lst in candidates.values(): for _sz, blob, _ct in lst: used_ids.add(id(blob)) orphans_ordered = [(b, c) for b, c in orphans_ordered if id(b) not in used_ids] if not candidates[1] and orphans_ordered: b, c = orphans_ordered.pop(0) candidates[1].append((len(b), b, c)) if not candidates[2] and orphans_ordered: b, c = orphans_ordered.pop(0) candidates[2].append((len(b), b, c)) return candidates def merge_best_appendix_figures( per_doc: list[tuple[str, dict[int, list[tuple[int, bytes, str]]]]], ) -> dict[int, tuple[bytes, str, str]]: """ 多文档合并:每个 slot 只保留字节最大的一张(更可能是主流程图而非小图标)。 返回 slot -> (blob, content_type, source_doc_name) """ best: dict[int, tuple[int, bytes, str, str]] = {} for doc_name, cand in per_doc: for slot in (1, 2): for size, blob, ct in cand.get(slot) or []: prev = best.get(slot) if prev is None or size > prev[0]: best[slot] = (size, blob, ct, doc_name) return {k: (v[1], v[2], v[3]) for k, v in best.items()} def appendix_figure_markdown_images( resolved: dict[int, tuple[bytes, str, str]], *, label_title: list[tuple[str, str]], ) -> dict[int, str]: """slot -> markdown 片段(含 ### 标题与 ![](data:...))""" out: dict[int, str] = {} slot_to_title = {i + 1: lt for i, lt in enumerate(label_title)} for slot, (blob, ct, src) in resolved.items(): if slot not in slot_to_title: continue label, title = slot_to_title[slot] uri = _blob_to_data_uri(blob, ct) cap = f"{label} {title}" src_note = f"\n\n*(嵌入来源:{src})*" if src else "" out[slot] = f"### {cap}\n\n![{cap}]({uri}){src_note}" return out