section_reference_block/services/file_parse_client.py
xxy 43f3e0b746 Initial commit
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-05 18:41:06 +08:00

195 lines
6.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
services/file_parse_client.py
调用远程解析服务(默认 http://192.168.4.194:8000/convert
上传文件multipart文件字段默认 "file",并附带 engine=auto 表单字段)→ 返回 Markdown。
响应解析JSON 中按 results / md_content / mdcontent / markdown / content
逐层提取;若响应非 JSON 则整体作为 Markdown 返回。
"""
from __future__ import annotations
import json
import logging
import mimetypes
import time
import uuid
from pathlib import Path
from typing import Any
from urllib import error as urlerror
from urllib import request as urlrequest
from config import settings
logger = logging.getLogger(__name__)
MD_CONTENT_KEYS = ("md_content", "mdcontent", "markdown", "content")
class FileParseApiError(RuntimeError):
def __init__(self, message: str, *, status_code: int | None = None, api_url: str = "") -> None:
super().__init__(message)
self.status_code = status_code
self.api_url = api_url
def _build_multipart_body(
file_path: Path,
field_name: str,
extra_fields: dict[str, str] | None = None,
) -> tuple[bytes, str]:
boundary = uuid.uuid4().hex
mime_type = mimetypes.guess_type(file_path.name)[0] or "application/octet-stream"
file_bytes = file_path.read_bytes()
parts: list[bytes] = []
# 普通表单字段(如 engine=auto
for key, value in (extra_fields or {}).items():
if value is None or str(value).strip() == "":
continue
parts.append(f"--{boundary}\r\n".encode("utf-8"))
parts.append(
f'Content-Disposition: form-data; name="{key}"\r\n\r\n'.encode("utf-8")
)
parts.append(f"{value}\r\n".encode("utf-8"))
# 文件字段
parts.append(f"--{boundary}\r\n".encode("utf-8"))
parts.append(
(
f'Content-Disposition: form-data; name="{field_name}"; filename="{file_path.name}"\r\n'
f"Content-Type: {mime_type}\r\n\r\n"
).encode("utf-8")
)
parts.append(file_bytes)
parts.append(f"\r\n--{boundary}--\r\n".encode("utf-8"))
return b"".join(parts), boundary
def _extract_md_contents(payload: Any) -> list[str]:
if isinstance(payload, str):
return [payload]
if isinstance(payload, list):
out: list[str] = []
for item in payload:
out.extend(_extract_md_contents(item))
return out
if not isinstance(payload, dict):
return []
for key in MD_CONTENT_KEYS:
value = payload.get(key)
if isinstance(value, str):
return [value]
results = payload.get("results")
if results is not None:
return _extract_md_contents(results)
out = []
for value in payload.values():
out.extend(_extract_md_contents(value))
return out
def _response_to_markdown(text: str) -> str:
try:
payload = json.loads(text)
except json.JSONDecodeError:
# 非 JSON 直接当作 Markdown 返回
return text
contents = _extract_md_contents(payload)
if not contents:
raise ValueError("解析服务响应中未找到 md_content/markdown/content 字段")
return "\n\n".join(c.strip() for c in contents if c and c.strip())
def _request_once(
api_url: str,
file_path: Path,
field_name: str,
*,
timeout_sec: int,
extra_fields: dict[str, str] | None = None,
) -> str:
body, boundary = _build_multipart_body(file_path, field_name, extra_fields)
req = urlrequest.Request(
api_url,
data=body,
method="POST",
headers={"content-type": f"multipart/form-data; boundary={boundary}"},
)
try:
with urlrequest.urlopen(req, timeout=timeout_sec) as resp:
raw = resp.read()
encoding = resp.headers.get_content_charset() or "utf-8"
return raw.decode(encoding, errors="replace")
except urlerror.HTTPError as exc:
body_text = ""
try:
body_text = (exc.read() or b"").decode("utf-8", errors="replace")[:1000]
except Exception:
pass
raise FileParseApiError(
f"解析服务 HTTP {exc.code}{api_url}{body_text or exc.reason}",
status_code=int(exc.code or 0),
api_url=api_url,
) from exc
except urlerror.URLError as exc:
raise FileParseApiError(
f"无法连接解析服务({api_url}{exc.reason}",
status_code=0,
api_url=api_url,
) from exc
def parse_file_to_markdown(file_path: str | Path) -> str:
"""
将上传文件通过远程 file_parse 服务转换为 Markdown。
失败时对 5xx 做有限重试。
"""
path = Path(file_path)
if not path.is_file():
raise FileNotFoundError(f"文件不存在: {path}")
api_url = str(settings.FILE_PARSE_API_URL or "").strip()
if not api_url:
raise ValueError("FILE_PARSE_API_URL 未配置")
field_name = str(settings.FILE_PARSE_FIELD_NAME or "file").strip() or "file"
timeout_sec = max(int(settings.FILE_PARSE_HTTP_TIMEOUT_SEC or 600), 30)
retry_count = max(int(settings.FILE_PARSE_RETRY_COUNT or 1), 1)
backoff_sec = max(float(settings.FILE_PARSE_RETRY_BACKOFF_SEC or 1.0), 1.0)
retryable_status = {500, 502, 503, 504}
extra_fields: dict[str, str] = {}
engine = str(getattr(settings, "FILE_PARSE_ENGINE", "") or "").strip()
if engine:
extra_fields["engine"] = engine
last_error: Exception | None = None
for attempt in range(1, retry_count + 1):
try:
raw = _request_once(
api_url, path, field_name, timeout_sec=timeout_sec, extra_fields=extra_fields
)
markdown = _response_to_markdown(raw)
if not markdown.strip():
raise ValueError("解析服务返回的 Markdown 为空")
return markdown
except FileParseApiError as exc:
last_error = exc
status = int(exc.status_code or 0)
if attempt >= retry_count or status not in retryable_status:
raise
wait = backoff_sec * attempt
logger.warning(
"file_parse 重试 %s/%s status=%s wait=%ss file=%s",
attempt, retry_count, status, wait, path.name,
)
time.sleep(wait)
if last_error:
raise last_error
raise RuntimeError("file_parse 请求失败")