Source code for imednet_workflows.state_ledger

"""Stateful incremental high-water mark tracker for workflow streams."""

from __future__ import annotations

import contextlib
import json
import os
import tempfile
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, Generator, Optional

from pydantic import BaseModel, Field

# Graceful fallback if fcntl is not available (e.g. non-UNIX environments)
try:
    import fcntl
except ImportError:
    fcntl = None  # type: ignore[assignment]


[docs]class StreamState(BaseModel): """Schema for individual stream execution checkpoints.""" last_timestamp: datetime records_processed: int = 0 last_run_status: str = "success" error_message: Optional[str] = None metadata: Dict[str, Any] = Field(default_factory=dict)
[docs]class StudyState(BaseModel): """Schema for all streams in a given study context.""" streams: Dict[str, StreamState] = Field(default_factory=dict)
[docs]class LedgerState(BaseModel): """Schema for the entire ledger file containing all studies.""" studies: Dict[str, StudyState] = Field(default_factory=dict)
[docs]class ExtractionStateLedger: """Manages transactional state bookmarks per study to guarantee absolute ingestion tracking."""
[docs] def __init__(self, ledger_path: str = "/var/lib/imednet/pipeline_ledger.json") -> None: self.ledger_path = Path(ledger_path) self._lock_path = self.ledger_path.with_suffix(".lock")
def _ensure_ledger_exists(self) -> None: if not self.ledger_path.exists(): self.ledger_path.parent.mkdir(parents=True, exist_ok=True) with open(self.ledger_path, "w", encoding="utf-8") as f: json.dump({"studies": {}}, f, indent=2) @contextlib.contextmanager def _lock(self) -> Generator[None, None, None]: """Cross-process file lock using flock on UNIX.""" self._lock_path.parent.mkdir(parents=True, exist_ok=True) if fcntl is None: # Fallback for systems without fcntl (e.g. Windows) yield return with open(self._lock_path, "w") as lock_file: try: fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX) # type: ignore[attr-defined] yield finally: fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN) # type: ignore[attr-defined]
[docs] def read_state(self) -> LedgerState: """Reads and validates the current ledger state.""" self._ensure_ledger_exists() with open(self.ledger_path, "r", encoding="utf-8") as f: try: data = json.load(f) except json.JSONDecodeError: # If file is empty or corrupted, fallback to empty ledger return LedgerState(studies={}) return LedgerState.model_validate(data)
[docs] def write_state(self, state: LedgerState) -> None: """Writes the ledger state atomically using a temporary file.""" self.ledger_path.parent.mkdir(parents=True, exist_ok=True) # Serialize first to ensure the data is perfectly valid serialized = state.model_dump_json(indent=2) dir_name = self.ledger_path.parent # Write to temp file in the same directory, then rename atomically with tempfile.NamedTemporaryFile("w", dir=dir_name, delete=False, encoding="utf-8") as tf: tf.write(serialized) temp_name = tf.name try: os.replace(temp_name, self.ledger_path) except Exception: # Cleanup temp file on failure if os.path.exists(temp_name): os.remove(temp_name) raise
[docs] def get_last_timestamp(self, study_key: str, stream_name: str) -> Optional[datetime]: """Returns the high-water mark timestamp for a given study and stream.""" with self._lock(): state = self.read_state() study = state.studies.get(study_key) if not study: return None stream = study.streams.get(stream_name) if not stream: return None # Ensure return datetime is timezone-aware ts = stream.last_timestamp if ts.tzinfo is None: ts = ts.replace(tzinfo=timezone.utc) return ts
[docs] def set_last_timestamp( self, study_key: str, stream_name: str, timestamp: datetime, records_processed: int = 0, status: str = "success", error_message: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None, ) -> None: """Sets the high-water mark timestamp atomically.""" with self._lock(): state = self.read_state() study = state.studies.setdefault(study_key, StudyState()) # Ensure timezone-aware datetime if timestamp.tzinfo is None: timestamp = timestamp.replace(tzinfo=timezone.utc) study.streams[stream_name] = StreamState( last_timestamp=timestamp, records_processed=records_processed, last_run_status=status, error_message=error_message, metadata=metadata or {}, ) self.write_state(state)
[docs] @contextlib.contextmanager def transaction( self, study_key: str, stream_name: str, fallback_timestamp: Optional[datetime] = None, ) -> Generator[Dict[str, Any], None, None]: """ Context manager for transactional state tracking. Yields a dict where user can record 'records_processed', 'new_timestamp', and 'metadata'. Saves automatically upon exiting the context with no exceptions. The ledger file lock is held for the entire duration of the context. """ with self._lock(): state = self.read_state() study = state.studies.get(study_key) last_ts: Optional[datetime] = None if study: stream_state = study.streams.get(stream_name) if stream_state: last_ts = stream_state.last_timestamp if last_ts.tzinfo is None: last_ts = last_ts.replace(tzinfo=timezone.utc) if last_ts is None: last_ts = fallback_timestamp if last_ts is not None and last_ts.tzinfo is None: last_ts = last_ts.replace(tzinfo=timezone.utc) tx_data: Dict[str, Any] = { "last_timestamp": last_ts, "new_timestamp": None, "records_processed": 0, "metadata": {}, } try: yield tx_data # Commit changes only if successful and new_timestamp is set new_ts = tx_data.get("new_timestamp") if new_ts: if new_ts.tzinfo is None: new_ts = new_ts.replace(tzinfo=timezone.utc) study_entry = state.studies.setdefault(study_key, StudyState()) study_entry.streams[stream_name] = StreamState( last_timestamp=new_ts, records_processed=tx_data.get("records_processed", 0), last_run_status="success", metadata=tx_data.get("metadata") or {}, ) self.write_state(state) except Exception as err: # Mark stream as failed err_ts = tx_data.get("new_timestamp") or last_ts or datetime.now(timezone.utc) if err_ts.tzinfo is None: err_ts = err_ts.replace(tzinfo=timezone.utc) study_entry = state.studies.setdefault(study_key, StudyState()) study_entry.streams[stream_name] = StreamState( last_timestamp=err_ts, records_processed=tx_data.get("records_processed", 0), last_run_status="failed", error_message=str(err), metadata=tx_data.get("metadata") or {}, ) self.write_state(state) raise
[docs] def delete_entry(self, study_key: str, stream_name: Optional[str] = None) -> bool: """Deletes a study or specific stream entry from the ledger under the file lock. Returns ``True`` if the entry existed and was removed, ``False`` otherwise. """ with self._lock(): state = self.read_state() if study_key not in state.studies: return False if stream_name is None: del state.studies[study_key] else: if stream_name not in state.studies[study_key].streams: return False del state.studies[study_key].streams[stream_name] self.write_state(state) return True