from __future__ import annotations from copy import deepcopy from datetime import datetime import threading from typing import Any, Optional _RUNTIME_LOCK = threading.RLock() _JOB_STATES: dict[str, dict[str, Any]] = {} def _now_str() -> str: return datetime.now().strftime("%Y-%m-%d %H:%M:%S") def _chapter_payload( *, section_key: str, section_title: str, section_order: int, status: str = "pending", ) -> dict[str, Any]: return { "sectionKey": section_key, "sectionTitle": section_title, "sectionOrder": section_order, "status": status, "content": None, "errorMessage": None, "updatedAt": _now_str(), "promptText": None, "evidencePayload": None, "validationPayload": None, } def init_job_state( *, job_id: str, project_id: str, template_id: Optional[str], chapters: list[dict[str, Any]], ) -> None: with _RUNTIME_LOCK: _JOB_STATES[job_id] = { "jobId": job_id, "projectId": project_id, "templateId": template_id, "status": "pending", "progress": 0, "currentSectionKey": None, "errorMessage": None, "createdAt": _now_str(), "updatedAt": _now_str(), "completedAt": None, "chapters": { str(item["sectionKey"]): _chapter_payload( section_key=str(item["sectionKey"]), section_title=str(item["sectionTitle"]), section_order=int(item["sectionOrder"]), status=str(item.get("status") or "pending"), ) for item in (chapters or []) }, } def get_job_state(job_id: str) -> Optional[dict[str, Any]]: with _RUNTIME_LOCK: state = _JOB_STATES.get(job_id) return deepcopy(state) if state else None def update_job_state(job_id: str, **fields: Any) -> None: with _RUNTIME_LOCK: state = _JOB_STATES.get(job_id) if not state: return state.update(fields) state["updatedAt"] = _now_str() def update_chapter_state( job_id: str, section_key: str, **fields: Any, ) -> None: with _RUNTIME_LOCK: state = _JOB_STATES.get(job_id) if not state: return chapter = state.get("chapters", {}).get(section_key) if not chapter: return chapter.update(fields) chapter["updatedAt"] = _now_str() state["updatedAt"] = _now_str() def append_chapter_content( job_id: str, section_key: str, delta_text: str, *, stream_phase: str, ) -> None: if not delta_text: return with _RUNTIME_LOCK: state = _JOB_STATES.get(job_id) if not state: return chapter = state.get("chapters", {}).get(section_key) if not chapter: return current = str(chapter.get("content") or "") validation_payload = dict(chapter.get("validationPayload") or {}) validation_payload["streamPhase"] = stream_phase chapter["content"] = current + delta_text chapter["validationPayload"] = validation_payload chapter["updatedAt"] = _now_str() state["currentSectionKey"] = section_key state["updatedAt"] = _now_str() def set_chapter_stream_phase(job_id: str, section_key: str, stream_phase: str) -> None: with _RUNTIME_LOCK: state = _JOB_STATES.get(job_id) if not state: return chapter = state.get("chapters", {}).get(section_key) if not chapter: return validation_payload = dict(chapter.get("validationPayload") or {}) validation_payload["streamPhase"] = stream_phase chapter["validationPayload"] = validation_payload chapter["updatedAt"] = _now_str() state["currentSectionKey"] = section_key state["updatedAt"] = _now_str() def remove_job_state(job_id: str) -> None: with _RUNTIME_LOCK: _JOB_STATES.pop(job_id, None)