Source code for imednet.integrations.document

"""MongoDB document-envelope export sink.

This module implements the **structure-preserving export path** for a MongoDB
destination.  Records fetched via
:class:`~imednet_workflows.data_extraction.DataExtractionWorkflow` are wrapped
in a consistent document envelope that preserves the clinical hierarchy.

Document envelope
-----------------
Each document stored in MongoDB represents a single iMednet **Record** and
carries the metadata needed to locate and de-duplicate it:

.. code-block:: json

    {
        "_id": "<study_key>/<record_id>",
        "study_key":     "MYSTUDY",
        "record_id":     1234,
        "subject_id":    99,
        "subject_key":   "SUBJ-001",
        "visit_id":      42,
        "form_id":       7,
        "form_key":      "BASELINE",
        "record_status": "Complete",
        "deleted":       false,
        "date_created":  "2024-01-01T08:00:00Z",
        "date_modified": "2024-01-15T12:00:00Z",
        "record_data":   { "var1": "value1", "var2": 99 },
        "exported_at":   "2024-01-15T12:00:00Z"
    }

The ``_id`` field is a composite key ``<study_key>/<record_id>`` which
guarantees uniqueness within a collection and enables efficient upserts.

Optional dependency
-------------------
Requires ``pymongo`` (install via ``pip install 'imednet[mongodb]'``).
The client is imported lazily at connection time.

Idempotency
-----------
When ``SinkConfig.idempotent`` is ``True`` (default) the sink uses
``bulk_write`` with ``UpdateOne(..., upsert=True)`` operations so that
re-running an export for the same batch updates existing documents rather
than inserting duplicates.

Usage
-----
.. code-block:: python

    from imednet.integrations.document import MongoDbExportSink
    from imednet_workflows.data_extraction import DataExtractionWorkflow

    records = DataExtractionWorkflow(sdk).extract_records_by_criteria(
        study_key="MYSTUDY",
    )

    with MongoDbExportSink(
        uri="******localhost:27017",
        database="imednet",
        collection="records",
    ) as sink:
        for i, batch in enumerate(batched(records, 500)):
            sink.write_batch(batch, batch_id=f"MYSTUDY/all/{i}")
"""

from __future__ import annotations

import logging
import time
from datetime import datetime, timezone
from typing import Any, Optional, Sequence

from imednet.errors import ExportBatchError, ExportConfigurationError
from imednet.sdk import ImednetSDK

from .sink_base import ExportSink, SinkConfig, _redact_uri, _require_optional_dep, iter_batches

logger = logging.getLogger(__name__)


def _make_document_id(study_key: str, record_id: Any) -> str:
    """Return the composite ``_id`` for a MongoDB record document."""
    return f"{study_key}/{record_id}"


def _record_to_document(record: Any, study_key: str) -> dict[str, Any]:
    """Wrap a typed ``Record`` model in the standard document envelope."""
    record_id = getattr(record, "record_id", None)
    return {
        "_id": _make_document_id(study_key, record_id),
        "study_key": study_key,
        "record_id": record_id,
        "subject_id": getattr(record, "subject_id", None),
        "subject_key": getattr(record, "subject_key", None),
        "visit_id": getattr(record, "visit_id", None),
        "form_id": getattr(record, "form_id", None),
        "form_key": getattr(record, "form_key", None),
        "record_status": getattr(record, "record_status", None),
        "deleted": getattr(record, "deleted", None),
        "date_created": getattr(record, "date_created", None),
        "date_modified": getattr(record, "date_modified", None),
        "record_data": dict(getattr(record, "record_data", {}) or {}),
        "exported_at": datetime.now(tz=timezone.utc).isoformat(),
    }


[docs]class MongoDbExportSink(ExportSink): """Export iMednet records as MongoDB document envelopes. Parameters ---------- uri: MongoDB connection URI (e.g. ``"******localhost:27017"``). Credentials are never logged. database: Target database name. collection: Target collection name. study_key: Study identifier embedded in every document and used to build the composite ``_id``. config: Optional :class:`~imednet.integrations.sink_base.SinkConfig`. Raises ------ ~imednet.errors.ExportConfigurationError When the client cannot connect to the server. ImportError When the ``pymongo`` package is not installed. """
[docs] def __init__( self, uri: str, database: str, collection: str, study_key: str, *, config: Optional[SinkConfig] = None, ) -> None: super().__init__(config) self._uri = uri self._database = database self._collection_name = collection self._study_key = study_key self._client: Any = None self._collection: Any = None self._connect()
# ------------------------------------------------------------------ # Connection management # ------------------------------------------------------------------ def _connect(self) -> None: pymongo = _require_optional_dep("pymongo", "mongodb") redacted = _redact_uri(self._uri) logger.debug("Connecting to MongoDB at %s", redacted) try: self._client = pymongo.MongoClient(self._uri) # Force a connection check self._client.admin.command("ping") db = self._client[self._database] self._collection = db[self._collection_name] except Exception as exc: raise ExportConfigurationError( f"Cannot connect to MongoDB at {redacted}. " f"Driver error type: {type(exc).__name__}" ) from exc # ------------------------------------------------------------------ # ExportSink interface # ------------------------------------------------------------------
[docs] def write_batch(self, records: Sequence[Any], *, batch_id: str) -> int: """Write *records* to MongoDB using upsert (idempotent) or insert. Parameters ---------- records: Sequence of typed ``Record`` model instances. batch_id: Idempotency key (e.g. ``"MYSTUDY/FORM1/0"``). Returns ------- int Number of records written (upserted or inserted). """ docs = [_record_to_document(r, self._study_key) for r in records] if not docs: return 0 last_exc: Optional[Exception] = None for attempt in range(self.config.max_retries + 1): try: if self.config.idempotent: pymongo = _require_optional_dep("pymongo", "mongodb") ops = [ pymongo.UpdateOne( {"_id": doc["_id"]}, {"$set": doc}, upsert=True, ) for doc in docs ] result = self._collection.bulk_write(ops, ordered=False) written = len(docs) else: result = self._collection.insert_many(docs, ordered=False) written = len(result.inserted_ids) logger.debug("Wrote batch %s (%d records)", batch_id, written) return written except Exception as exc: # noqa: BLE001 last_exc = exc if attempt < self.config.max_retries: delay = self.config.retry_backoff * (2**attempt) logger.warning( "Batch %s attempt %d failed (%s); retrying in %.1fs", batch_id, attempt + 1, exc, delay, ) time.sleep(delay) raise ExportBatchError( f"Batch {batch_id!r} failed after {self.config.max_retries + 1} " f"attempts: {last_exc}", batch_id=batch_id, )
[docs] def flush(self) -> None: """No-op: MongoDB writes are committed per bulk operation."""
[docs] def close(self) -> None: """Close the underlying PyMongo client.""" if self._client is not None: try: self._client.close() finally: self._client = None self._collection = None
[docs]def export_to_mongodb( sdk: ImednetSDK, study_key: str, uri: str, database: str, collection: str, *, config: Optional[SinkConfig] = None, ) -> int: """Export study records to MongoDB using :class:`MongoDbExportSink`.""" cfg = config if config is not None else SinkConfig() records = sdk.records.list(study_key=study_key, record_data_filter=None) total_written = 0 with MongoDbExportSink(uri, database, collection, study_key, config=cfg) as sink: for index, batch in enumerate(iter_batches(records, cfg.batch_size)): total_written += sink.write_batch(batch, batch_id=f"{study_key}/records/{index}") return total_written
__all__ = ["MongoDbExportSink", "export_to_mongodb"]