Source code for imednet_workflows.sync_worker

from __future__ import annotations

import logging
from dataclasses import dataclass
from pathlib import Path
from threading import Event

from filelock import FileLock

from .cached_loader import CachedRecordsLoader

logger = logging.getLogger(__name__)


[docs]@dataclass(slots=True) class SyncWorkerConfig: study_key: str interval_seconds: int = 900 reconcile: bool = True lock_timeout_seconds: int = 30
[docs]class SyncWorker: """Background cache refresher that runs incremental record sync loops."""
[docs] def __init__( self, loader: CachedRecordsLoader, *, config: SyncWorkerConfig, stop_event: Event | None = None, ) -> None: self._loader = loader self._config = config self._stop_event = stop_event or Event() lock_path = Path(f"{loader.db_path}.lock") self._lock = FileLock(str(lock_path), timeout=config.lock_timeout_seconds)
[docs] def run_once(self) -> int: """Run one idempotent cache sync cycle.""" with self._lock: records = self._loader.load_records( self._config.study_key, reconcile=self._config.reconcile, ) logger.info( "sync cycle complete", extra={ "study_key": self._config.study_key, "record_count": len(records), }, ) return len(records)
[docs] def run_forever(self) -> None: """Run sync cycles until stopped.""" logger.info( "sync worker started", extra={ "study_key": self._config.study_key, "interval_seconds": self._config.interval_seconds, }, ) while not self._stop_event.is_set(): try: self.run_once() except Exception: # pragma: no cover - defensive logging path logger.exception("sync worker cycle failed") if self._stop_event.wait(self._config.interval_seconds): break logger.info("sync worker stopped", extra={"study_key": self._config.study_key})
[docs] def stop(self) -> None: """Request graceful termination.""" self._stop_event.set()