Source code for imednet.integrations.graph

"""Neo4j graph export sink.

This module implements the **structure-preserving export path** for a Neo4j
property-graph destination.  Records fetched via
:class:`~imednet_workflows.data_extraction.DataExtractionWorkflow` are written
as graph nodes and relationships that mirror the clinical data hierarchy.

Graph shape
-----------
Nodes
~~~~~
* ``(:Study   {study_key})``
* ``(:Subject {subject_key, study_key})``
* ``(:Visit   {visit_id, subject_key, study_key})``
* ``(:Record  {record_id, form_id, visit_id, subject_key, study_key, **record_data})``

Relationships
~~~~~~~~~~~~~
* ``(:Study)-[:HAS_SUBJECT]->(:Subject)``
* ``(:Subject)-[:HAS_VISIT]->(:Visit)``
* ``(:Visit)-[:HAS_RECORD]->(:Record)``

Optional dependency
-------------------
Requires ``neo4j`` (install via ``pip install 'imednet[neo4j]'``).
The driver is imported lazily at connection time so that importing this
module never fails when ``neo4j`` is not installed.

Idempotency
-----------
When ``SinkConfig.idempotent`` is ``True`` (default) the sink uses
``MERGE`` on the node's primary key property so that re-running an export
for the same batch updates existing nodes rather than creating duplicates.

Usage
-----
.. code-block:: python

    from imednet.integrations.graph import Neo4jExportSink, Neo4jSinkConfig
    from imednet_workflows.data_extraction import DataExtractionWorkflow

    records = DataExtractionWorkflow(sdk).extract_records_by_criteria(
        study_key="MYSTUDY",
    )

    config = Neo4jSinkConfig(batch_size=200)
    with Neo4jExportSink(
        uri="bolt://localhost:7687",
        auth=("neo4j", "password"),
        config=config,
    ) as sink:
        for i, batch in enumerate(batched(records, config.batch_size)):
            sink.write_batch(batch, batch_id=f"MYSTUDY/all/{i}")
"""

from __future__ import annotations

import logging
import time
from dataclasses import dataclass
from typing import Any, Optional, Sequence, Tuple

from imednet.errors import ExportBatchError, ExportConfigurationError
from imednet.sdk import ImednetSDK

from .sink_base import ExportSink, SinkConfig, _redact_uri, _require_optional_dep, iter_batches

logger = logging.getLogger(__name__)


[docs]@dataclass class Neo4jSinkConfig(SinkConfig): """Extended :class:`~imednet.integrations.sink_base.SinkConfig` for Neo4j. Parameters ---------- database: Target Neo4j database name (default ``"neo4j"``). """ database: str = "neo4j"
# --------------------------------------------------------------------------- # Cypher templates # --------------------------------------------------------------------------- _MERGE_RECORD_CYPHER = """\ UNWIND $rows AS row MERGE (s:Study {study_key: row.study_key}) MERGE (su:Subject {subject_key: row.subject_key, study_key: row.study_key}) MERGE (v:Visit {visit_id: row.visit_id, study_key: row.study_key}) MERGE (r:Record {record_id: row.record_id, study_key: row.study_key}) SET r += row.record_data MERGE (s)-[:HAS_SUBJECT]->(su) MERGE (su)-[:HAS_VISIT]->(v) MERGE (v)-[:HAS_RECORD]->(r) """ _CREATE_RECORD_CYPHER = """\ UNWIND $rows AS row CREATE (r:Record { record_id: row.record_id, form_id: row.form_id, visit_id: row.visit_id, subject_key: row.subject_key, study_key: row.study_key }) SET r += row.record_data WITH r, row MATCH (v:Visit {visit_id: row.visit_id, study_key: row.study_key}) MERGE (v)-[:HAS_RECORD]->(r) """ def _record_to_row(record: Any, study_key: str) -> dict[str, Any]: """Convert a typed ``Record`` model to a flat Cypher parameter dict.""" return { "record_id": getattr(record, "record_id", None), "form_id": getattr(record, "form_id", None), "visit_id": getattr(record, "visit_id", None), "subject_key": getattr(record, "subject_key", None), "study_key": study_key, "record_data": dict(getattr(record, "record_data", {}) or {}), }
[docs]class Neo4jExportSink(ExportSink): """Export iMednet records as Neo4j graph nodes and relationships. Parameters ---------- uri: Neo4j Bolt or HTTP URI (e.g. ``"bolt://localhost:7687"`` or ``"neo4j+s://xxxx.databases.neo4j.io"``). auth: ``(username, password)`` tuple. Credentials are never logged. study_key: Study identifier attached to every node for multi-study graphs. config: Optional :class:`Neo4jSinkConfig` (or plain :class:`SinkConfig`). Defaults to :class:`Neo4jSinkConfig` with all values at defaults. Raises ------ ~imednet.errors.ExportConfigurationError When the driver cannot connect to the database. ImportError When the ``neo4j`` package is not installed. """
[docs] def __init__( self, uri: str, auth: Tuple[str, str], study_key: str, *, config: Optional[SinkConfig] = None, ) -> None: super().__init__(config if config is not None else Neo4jSinkConfig()) self._uri = uri self._auth = auth self._study_key = study_key self._driver: Any = None self._connect()
# ------------------------------------------------------------------ # Connection management # ------------------------------------------------------------------ def _connect(self) -> None: neo4j_mod = _require_optional_dep("neo4j", "neo4j") redacted = _redact_uri(self._uri) logger.debug("Connecting to Neo4j at %s", redacted) try: self._driver = neo4j_mod.GraphDatabase.driver(self._uri, auth=self._auth) self._driver.verify_connectivity() except Exception as exc: raise ExportConfigurationError(f"Cannot connect to Neo4j at {redacted}: {exc}") from exc # ------------------------------------------------------------------ # ExportSink interface # ------------------------------------------------------------------
[docs] def write_batch(self, records: Sequence[Any], *, batch_id: str) -> int: """Write *records* to Neo4j using MERGE (idempotent) or CREATE. Parameters ---------- records: Sequence of typed ``Record`` model instances. batch_id: Idempotency key (e.g. ``"MYSTUDY/FORM1/0"``). Returns ------- int Number of records written. """ rows = [_record_to_row(r, self._study_key) for r in records] if not rows: return 0 cypher = _MERGE_RECORD_CYPHER if self.config.idempotent else _CREATE_RECORD_CYPHER cfg = self.config if isinstance(self.config, Neo4jSinkConfig) else Neo4jSinkConfig() last_exc: Optional[Exception] = None for attempt in range(self.config.max_retries + 1): try: with self._driver.session(database=cfg.database) as session: session.run(cypher, rows=rows) logger.debug("Wrote batch %s (%d records)", batch_id, len(rows)) return len(rows) except Exception as exc: # noqa: BLE001 last_exc = exc if attempt < self.config.max_retries: delay = self.config.retry_backoff * (2**attempt) logger.warning( "Batch %s attempt %d failed (%s); retrying in %.1fs", batch_id, attempt + 1, exc, delay, ) time.sleep(delay) raise ExportBatchError( f"Batch {batch_id!r} failed after {self.config.max_retries + 1} " f"attempts: {last_exc}", batch_id=batch_id, )
[docs] def flush(self) -> None: """No-op: Neo4j writes are committed per transaction."""
[docs] def close(self) -> None: """Close the underlying Neo4j driver connection.""" if self._driver is not None: try: self._driver.close() finally: self._driver = None
[docs]def export_to_neo4j( sdk: ImednetSDK, study_key: str, uri: str, auth: Tuple[str, str], *, config: Optional[Neo4jSinkConfig] = None, ) -> int: """Export study records to Neo4j using :class:`Neo4jExportSink`.""" cfg = config if config is not None else Neo4jSinkConfig() records = sdk.records.list(study_key=study_key, record_data_filter=None) total_written = 0 with Neo4jExportSink(uri, auth, study_key, config=cfg) as sink: for index, batch in enumerate(iter_batches(records, cfg.batch_size)): total_written += sink.write_batch(batch, batch_id=f"{study_key}/records/{index}") return total_written
__all__ = ["Neo4jExportSink", "Neo4jSinkConfig", "export_to_neo4j"]