section_reference_block/services/appendix_figure_extraction.py
xxy aa98ea2623 @
Initial commit

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@
2026-06-05 18:45:29 +08:00

200 lines
6.4 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
从项目知识库 Word.docx中提取「附图1/附图2」嵌入图用于报告附录。
细则常见版式:附图标题段落与图在同一节或相邻段落;解析时合并前/当前/后段文字做关键词匹配。
"""
from __future__ import annotations
import base64
import logging
from pathlib import Path
from typing import Optional
from docx import Document
from docx.oxml.ns import qn
from docx.table import Table
from docx.text.paragraph import Paragraph
logger = logging.getLogger(__name__)
# 过滤装饰性小图logo 等)
_MIN_FIGURE_BYTES = 6000
R_EMBED = "{http://schemas.openxmlformats.org/officeDocument/2006/relationships}embed"
_NS = {
"w": "http://schemas.openxmlformats.org/wordprocessingml/2006/main",
"a": "http://schemas.openxmlformats.org/drawingml/2006/main",
}
def _compact(s: str) -> str:
return "".join(str(s or "").split())
def _classify_slot(ctx: str) -> Optional[int]:
"""
返回 1=全厂物料平衡图2=装置(如烷基化)物料平衡图。
"""
t = _compact(ctx)
if not t:
return None
# 附图编号(先判 2避免同段目录同时出现两个编号时误判
if "附图2" in t:
return 2
if "附图1" in t:
return 1
if "全厂" in t and "物料平衡" in t:
return 1
if "烷基化" in t and "物料平衡" in t:
return 2
if "装置" in t and "物料平衡" in t and "全厂" not in t:
return 2
return None
def _content_type_to_md_subtype(content_type: str) -> str:
ct = (content_type or "").lower()
if "jpeg" in ct or ct.endswith("jpg"):
return "jpeg"
if "png" in ct:
return "png"
if "gif" in ct:
return "gif"
if "emf" in ct:
return "x-emf"
if "wmf" in ct:
return "x-wmf"
return "png"
def _blob_to_data_uri(blob: bytes, content_type: str) -> str:
sub = _content_type_to_md_subtype(content_type)
b64 = base64.standard_b64encode(blob).decode("ascii")
return f"data:image/{sub};base64,{b64}"
def _iter_paragraphs_deep(doc: Document):
body_el = doc.element.body
for el in body_el:
if el.tag == qn("w:p"):
yield Paragraph(el, doc._body)
elif el.tag == qn("w:tbl"):
table = Table(el, doc._body)
for row in table.rows:
for cell in row.cells:
for p in cell.paragraphs:
yield p
def extract_appendix_figure_candidates_from_docx(path: Path) -> dict[int, list[tuple[int, bytes, str]]]:
"""
从单个 docx 收集候选图slot -> [(size, blob, content_type), ...]
content_type 来自 OPC part用于拼 data URI。
"""
candidates: dict[int, list[tuple[int, bytes, str]]] = {1: [], 2: []}
orphans_ordered: list[tuple[bytes, str]] = []
try:
doc = Document(str(path))
except Exception as exc:
logger.warning("appendix figure: open docx failed %s: %s", path, exc)
return candidates
paras = list(_iter_paragraphs_deep(doc))
texts = [p.text or "" for p in paras]
for i, p in enumerate(paras):
blobs_with_type: list[tuple[bytes, str]] = []
for blip in p._element.findall(".//a:blip", _NS):
embed = blip.get(R_EMBED)
if not embed:
continue
try:
rel = p.part.related_parts[embed]
except KeyError:
continue
blob = getattr(rel, "blob", None)
ct = getattr(rel, "content_type", "") or "image/png"
if blob and len(blob) >= _MIN_FIGURE_BYTES:
blobs_with_type.append((blob, ct))
if not blobs_with_type:
continue
prev_t = texts[i - 1] if i > 0 else ""
cur_t = texts[i]
next_t = texts[i + 1] if i + 1 < len(texts) else ""
ctx = f"{prev_t}\n{cur_t}\n{next_t}"
slot = _classify_slot(ctx)
if slot is None:
for blob, ct in blobs_with_type:
orphans_ordered.append((blob, ct))
continue
for blob, ct in blobs_with_type:
candidates[slot].append((len(blob), blob, ct))
def _dedupe_preserve_order(pairs: list[tuple[bytes, str]]) -> list[tuple[bytes, str]]:
seen: set[int] = set()
out: list[tuple[bytes, str]] = []
for blob, ct in pairs:
bid = id(blob)
if bid in seen:
continue
seen.add(bid)
out.append((blob, ct))
return out
orphans_ordered = _dedupe_preserve_order(orphans_ordered)
used_ids: set[int] = set()
for lst in candidates.values():
for _sz, blob, _ct in lst:
used_ids.add(id(blob))
orphans_ordered = [(b, c) for b, c in orphans_ordered if id(b) not in used_ids]
if not candidates[1] and orphans_ordered:
b, c = orphans_ordered.pop(0)
candidates[1].append((len(b), b, c))
if not candidates[2] and orphans_ordered:
b, c = orphans_ordered.pop(0)
candidates[2].append((len(b), b, c))
return candidates
def merge_best_appendix_figures(
per_doc: list[tuple[str, dict[int, list[tuple[int, bytes, str]]]]],
) -> dict[int, tuple[bytes, str, str]]:
"""
多文档合并:每个 slot 只保留字节最大的一张(更可能是主流程图而非小图标)。
返回 slot -> (blob, content_type, source_doc_name)
"""
best: dict[int, tuple[int, bytes, str, str]] = {}
for doc_name, cand in per_doc:
for slot in (1, 2):
for size, blob, ct in cand.get(slot) or []:
prev = best.get(slot)
if prev is None or size > prev[0]:
best[slot] = (size, blob, ct, doc_name)
return {k: (v[1], v[2], v[3]) for k, v in best.items()}
def appendix_figure_markdown_images(
resolved: dict[int, tuple[bytes, str, str]],
*,
label_title: list[tuple[str, str]],
) -> dict[int, str]:
"""slot -> markdown 片段(含 ### 标题与 ![](data:...)"""
out: dict[int, str] = {}
slot_to_title = {i + 1: lt for i, lt in enumerate(label_title)}
for slot, (blob, ct, src) in resolved.items():
if slot not in slot_to_title:
continue
label, title = slot_to_title[slot]
uri = _blob_to_data_uri(blob, ct)
cap = f"{label} {title}"
src_note = f"\n\n*(嵌入来源:{src}*" if src else ""
out[slot] = f"### {cap}\n\n![{cap}]({uri}){src_note}"
return out