195 lines
6.3 KiB
Python
195 lines
6.3 KiB
Python
"""
|
||
services/file_parse_client.py
|
||
调用远程解析服务(默认 http://192.168.4.194:8000/convert):
|
||
上传文件(multipart:文件字段默认 "file",并附带 engine=auto 表单字段)→ 返回 Markdown。
|
||
|
||
响应解析:JSON 中按 results / md_content / mdcontent / markdown / content
|
||
逐层提取;若响应非 JSON 则整体作为 Markdown 返回。
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import logging
|
||
import mimetypes
|
||
import time
|
||
import uuid
|
||
from pathlib import Path
|
||
from typing import Any
|
||
from urllib import error as urlerror
|
||
from urllib import request as urlrequest
|
||
|
||
from config import settings
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
MD_CONTENT_KEYS = ("md_content", "mdcontent", "markdown", "content")
|
||
|
||
|
||
class FileParseApiError(RuntimeError):
|
||
def __init__(self, message: str, *, status_code: int | None = None, api_url: str = "") -> None:
|
||
super().__init__(message)
|
||
self.status_code = status_code
|
||
self.api_url = api_url
|
||
|
||
|
||
def _build_multipart_body(
|
||
file_path: Path,
|
||
field_name: str,
|
||
extra_fields: dict[str, str] | None = None,
|
||
) -> tuple[bytes, str]:
|
||
boundary = uuid.uuid4().hex
|
||
mime_type = mimetypes.guess_type(file_path.name)[0] or "application/octet-stream"
|
||
file_bytes = file_path.read_bytes()
|
||
|
||
parts: list[bytes] = []
|
||
# 普通表单字段(如 engine=auto)
|
||
for key, value in (extra_fields or {}).items():
|
||
if value is None or str(value).strip() == "":
|
||
continue
|
||
parts.append(f"--{boundary}\r\n".encode("utf-8"))
|
||
parts.append(
|
||
f'Content-Disposition: form-data; name="{key}"\r\n\r\n'.encode("utf-8")
|
||
)
|
||
parts.append(f"{value}\r\n".encode("utf-8"))
|
||
|
||
# 文件字段
|
||
parts.append(f"--{boundary}\r\n".encode("utf-8"))
|
||
parts.append(
|
||
(
|
||
f'Content-Disposition: form-data; name="{field_name}"; filename="{file_path.name}"\r\n'
|
||
f"Content-Type: {mime_type}\r\n\r\n"
|
||
).encode("utf-8")
|
||
)
|
||
parts.append(file_bytes)
|
||
parts.append(f"\r\n--{boundary}--\r\n".encode("utf-8"))
|
||
return b"".join(parts), boundary
|
||
|
||
|
||
def _extract_md_contents(payload: Any) -> list[str]:
|
||
if isinstance(payload, str):
|
||
return [payload]
|
||
if isinstance(payload, list):
|
||
out: list[str] = []
|
||
for item in payload:
|
||
out.extend(_extract_md_contents(item))
|
||
return out
|
||
if not isinstance(payload, dict):
|
||
return []
|
||
|
||
for key in MD_CONTENT_KEYS:
|
||
value = payload.get(key)
|
||
if isinstance(value, str):
|
||
return [value]
|
||
|
||
results = payload.get("results")
|
||
if results is not None:
|
||
return _extract_md_contents(results)
|
||
|
||
out = []
|
||
for value in payload.values():
|
||
out.extend(_extract_md_contents(value))
|
||
return out
|
||
|
||
|
||
def _response_to_markdown(text: str) -> str:
|
||
try:
|
||
payload = json.loads(text)
|
||
except json.JSONDecodeError:
|
||
# 非 JSON 直接当作 Markdown 返回
|
||
return text
|
||
contents = _extract_md_contents(payload)
|
||
if not contents:
|
||
raise ValueError("解析服务响应中未找到 md_content/markdown/content 字段")
|
||
return "\n\n".join(c.strip() for c in contents if c and c.strip())
|
||
|
||
|
||
def _request_once(
|
||
api_url: str,
|
||
file_path: Path,
|
||
field_name: str,
|
||
*,
|
||
timeout_sec: int,
|
||
extra_fields: dict[str, str] | None = None,
|
||
) -> str:
|
||
body, boundary = _build_multipart_body(file_path, field_name, extra_fields)
|
||
req = urlrequest.Request(
|
||
api_url,
|
||
data=body,
|
||
method="POST",
|
||
headers={"content-type": f"multipart/form-data; boundary={boundary}"},
|
||
)
|
||
try:
|
||
with urlrequest.urlopen(req, timeout=timeout_sec) as resp:
|
||
raw = resp.read()
|
||
encoding = resp.headers.get_content_charset() or "utf-8"
|
||
return raw.decode(encoding, errors="replace")
|
||
except urlerror.HTTPError as exc:
|
||
body_text = ""
|
||
try:
|
||
body_text = (exc.read() or b"").decode("utf-8", errors="replace")[:1000]
|
||
except Exception:
|
||
pass
|
||
raise FileParseApiError(
|
||
f"解析服务 HTTP {exc.code}({api_url}):{body_text or exc.reason}",
|
||
status_code=int(exc.code or 0),
|
||
api_url=api_url,
|
||
) from exc
|
||
except urlerror.URLError as exc:
|
||
raise FileParseApiError(
|
||
f"无法连接解析服务({api_url}):{exc.reason}",
|
||
status_code=0,
|
||
api_url=api_url,
|
||
) from exc
|
||
|
||
|
||
def parse_file_to_markdown(file_path: str | Path) -> str:
|
||
"""
|
||
将上传文件通过远程 file_parse 服务转换为 Markdown。
|
||
失败时对 5xx 做有限重试。
|
||
"""
|
||
path = Path(file_path)
|
||
if not path.is_file():
|
||
raise FileNotFoundError(f"文件不存在: {path}")
|
||
|
||
api_url = str(settings.FILE_PARSE_API_URL or "").strip()
|
||
if not api_url:
|
||
raise ValueError("FILE_PARSE_API_URL 未配置")
|
||
|
||
field_name = str(settings.FILE_PARSE_FIELD_NAME or "file").strip() or "file"
|
||
timeout_sec = max(int(settings.FILE_PARSE_HTTP_TIMEOUT_SEC or 600), 30)
|
||
retry_count = max(int(settings.FILE_PARSE_RETRY_COUNT or 1), 1)
|
||
backoff_sec = max(float(settings.FILE_PARSE_RETRY_BACKOFF_SEC or 1.0), 1.0)
|
||
retryable_status = {500, 502, 503, 504}
|
||
|
||
extra_fields: dict[str, str] = {}
|
||
engine = str(getattr(settings, "FILE_PARSE_ENGINE", "") or "").strip()
|
||
if engine:
|
||
extra_fields["engine"] = engine
|
||
|
||
last_error: Exception | None = None
|
||
for attempt in range(1, retry_count + 1):
|
||
try:
|
||
raw = _request_once(
|
||
api_url, path, field_name, timeout_sec=timeout_sec, extra_fields=extra_fields
|
||
)
|
||
markdown = _response_to_markdown(raw)
|
||
if not markdown.strip():
|
||
raise ValueError("解析服务返回的 Markdown 为空")
|
||
return markdown
|
||
except FileParseApiError as exc:
|
||
last_error = exc
|
||
status = int(exc.status_code or 0)
|
||
if attempt >= retry_count or status not in retryable_status:
|
||
raise
|
||
wait = backoff_sec * attempt
|
||
logger.warning(
|
||
"file_parse 重试 %s/%s status=%s wait=%ss file=%s",
|
||
attempt, retry_count, status, wait, path.name,
|
||
)
|
||
time.sleep(wait)
|
||
|
||
if last_error:
|
||
raise last_error
|
||
raise RuntimeError("file_parse 请求失败")
|