200 lines
6.4 KiB
Python
200 lines
6.4 KiB
Python
"""
|
||
从项目知识库 Word(.docx)中提取「附图1/附图2」嵌入图,用于报告附录。
|
||
|
||
细则常见版式:附图标题段落与图在同一节或相邻段落;解析时合并前/当前/后段文字做关键词匹配。
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import base64
|
||
import logging
|
||
from pathlib import Path
|
||
from typing import Optional
|
||
|
||
from docx import Document
|
||
from docx.oxml.ns import qn
|
||
from docx.table import Table
|
||
from docx.text.paragraph import Paragraph
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# 过滤装饰性小图(logo 等)
|
||
_MIN_FIGURE_BYTES = 6000
|
||
|
||
R_EMBED = "{http://schemas.openxmlformats.org/officeDocument/2006/relationships}embed"
|
||
_NS = {
|
||
"w": "http://schemas.openxmlformats.org/wordprocessingml/2006/main",
|
||
"a": "http://schemas.openxmlformats.org/drawingml/2006/main",
|
||
}
|
||
|
||
|
||
def _compact(s: str) -> str:
|
||
return "".join(str(s or "").split())
|
||
|
||
|
||
def _classify_slot(ctx: str) -> Optional[int]:
|
||
"""
|
||
返回 1=全厂物料平衡图,2=装置(如烷基化)物料平衡图。
|
||
"""
|
||
t = _compact(ctx)
|
||
if not t:
|
||
return None
|
||
# 附图编号(先判 2,避免同段目录同时出现两个编号时误判)
|
||
if "附图2" in t:
|
||
return 2
|
||
if "附图1" in t:
|
||
return 1
|
||
if "全厂" in t and "物料平衡" in t:
|
||
return 1
|
||
if "烷基化" in t and "物料平衡" in t:
|
||
return 2
|
||
if "装置" in t and "物料平衡" in t and "全厂" not in t:
|
||
return 2
|
||
return None
|
||
|
||
|
||
def _content_type_to_md_subtype(content_type: str) -> str:
|
||
ct = (content_type or "").lower()
|
||
if "jpeg" in ct or ct.endswith("jpg"):
|
||
return "jpeg"
|
||
if "png" in ct:
|
||
return "png"
|
||
if "gif" in ct:
|
||
return "gif"
|
||
if "emf" in ct:
|
||
return "x-emf"
|
||
if "wmf" in ct:
|
||
return "x-wmf"
|
||
return "png"
|
||
|
||
|
||
def _blob_to_data_uri(blob: bytes, content_type: str) -> str:
|
||
sub = _content_type_to_md_subtype(content_type)
|
||
b64 = base64.standard_b64encode(blob).decode("ascii")
|
||
return f"data:image/{sub};base64,{b64}"
|
||
|
||
|
||
def _iter_paragraphs_deep(doc: Document):
|
||
body_el = doc.element.body
|
||
for el in body_el:
|
||
if el.tag == qn("w:p"):
|
||
yield Paragraph(el, doc._body)
|
||
elif el.tag == qn("w:tbl"):
|
||
table = Table(el, doc._body)
|
||
for row in table.rows:
|
||
for cell in row.cells:
|
||
for p in cell.paragraphs:
|
||
yield p
|
||
|
||
|
||
def extract_appendix_figure_candidates_from_docx(path: Path) -> dict[int, list[tuple[int, bytes, str]]]:
|
||
"""
|
||
从单个 docx 收集候选图:slot -> [(size, blob, content_type), ...]
|
||
content_type 来自 OPC part,用于拼 data URI。
|
||
"""
|
||
candidates: dict[int, list[tuple[int, bytes, str]]] = {1: [], 2: []}
|
||
orphans_ordered: list[tuple[bytes, str]] = []
|
||
try:
|
||
doc = Document(str(path))
|
||
except Exception as exc:
|
||
logger.warning("appendix figure: open docx failed %s: %s", path, exc)
|
||
return candidates
|
||
|
||
paras = list(_iter_paragraphs_deep(doc))
|
||
texts = [p.text or "" for p in paras]
|
||
|
||
for i, p in enumerate(paras):
|
||
blobs_with_type: list[tuple[bytes, str]] = []
|
||
for blip in p._element.findall(".//a:blip", _NS):
|
||
embed = blip.get(R_EMBED)
|
||
if not embed:
|
||
continue
|
||
try:
|
||
rel = p.part.related_parts[embed]
|
||
except KeyError:
|
||
continue
|
||
blob = getattr(rel, "blob", None)
|
||
ct = getattr(rel, "content_type", "") or "image/png"
|
||
if blob and len(blob) >= _MIN_FIGURE_BYTES:
|
||
blobs_with_type.append((blob, ct))
|
||
|
||
if not blobs_with_type:
|
||
continue
|
||
|
||
prev_t = texts[i - 1] if i > 0 else ""
|
||
cur_t = texts[i]
|
||
next_t = texts[i + 1] if i + 1 < len(texts) else ""
|
||
ctx = f"{prev_t}\n{cur_t}\n{next_t}"
|
||
slot = _classify_slot(ctx)
|
||
if slot is None:
|
||
for blob, ct in blobs_with_type:
|
||
orphans_ordered.append((blob, ct))
|
||
continue
|
||
|
||
for blob, ct in blobs_with_type:
|
||
candidates[slot].append((len(blob), blob, ct))
|
||
|
||
def _dedupe_preserve_order(pairs: list[tuple[bytes, str]]) -> list[tuple[bytes, str]]:
|
||
seen: set[int] = set()
|
||
out: list[tuple[bytes, str]] = []
|
||
for blob, ct in pairs:
|
||
bid = id(blob)
|
||
if bid in seen:
|
||
continue
|
||
seen.add(bid)
|
||
out.append((blob, ct))
|
||
return out
|
||
|
||
orphans_ordered = _dedupe_preserve_order(orphans_ordered)
|
||
used_ids: set[int] = set()
|
||
for lst in candidates.values():
|
||
for _sz, blob, _ct in lst:
|
||
used_ids.add(id(blob))
|
||
orphans_ordered = [(b, c) for b, c in orphans_ordered if id(b) not in used_ids]
|
||
|
||
if not candidates[1] and orphans_ordered:
|
||
b, c = orphans_ordered.pop(0)
|
||
candidates[1].append((len(b), b, c))
|
||
if not candidates[2] and orphans_ordered:
|
||
b, c = orphans_ordered.pop(0)
|
||
candidates[2].append((len(b), b, c))
|
||
|
||
return candidates
|
||
|
||
|
||
def merge_best_appendix_figures(
|
||
per_doc: list[tuple[str, dict[int, list[tuple[int, bytes, str]]]]],
|
||
) -> dict[int, tuple[bytes, str, str]]:
|
||
"""
|
||
多文档合并:每个 slot 只保留字节最大的一张(更可能是主流程图而非小图标)。
|
||
|
||
返回 slot -> (blob, content_type, source_doc_name)
|
||
"""
|
||
best: dict[int, tuple[int, bytes, str, str]] = {}
|
||
for doc_name, cand in per_doc:
|
||
for slot in (1, 2):
|
||
for size, blob, ct in cand.get(slot) or []:
|
||
prev = best.get(slot)
|
||
if prev is None or size > prev[0]:
|
||
best[slot] = (size, blob, ct, doc_name)
|
||
return {k: (v[1], v[2], v[3]) for k, v in best.items()}
|
||
|
||
|
||
def appendix_figure_markdown_images(
|
||
resolved: dict[int, tuple[bytes, str, str]],
|
||
*,
|
||
label_title: list[tuple[str, str]],
|
||
) -> dict[int, str]:
|
||
"""slot -> markdown 片段(含 ### 标题与 )"""
|
||
out: dict[int, str] = {}
|
||
slot_to_title = {i + 1: lt for i, lt in enumerate(label_title)}
|
||
for slot, (blob, ct, src) in resolved.items():
|
||
if slot not in slot_to_title:
|
||
continue
|
||
label, title = slot_to_title[slot]
|
||
uri = _blob_to_data_uri(blob, ct)
|
||
cap = f"{label} {title}"
|
||
src_note = f"\n\n*(嵌入来源:{src})*" if src else ""
|
||
out[slot] = f"### {cap}\n\n{src_note}"
|
||
return out
|