Source code for imednet.integrations.sink_base

"""Shared base classes and helpers for all export sinks.

Architecture decision
---------------------
The SDK provides three export paths:

**Tabular path** (``RecordMapper`` + ``pandas.DataFrame``)
    Flattens record data into a wide DataFrame and writes it to CSV, Excel,
    JSON, SQL, DuckDB, or Parquet.  All functions in
    :mod:`imednet.integrations.export` follow this path.

**Structure-preserving path** (``DataExtractionWorkflow`` + typed records)
    Traverses the clinical data hierarchy
    *Study → Subject → Visit → Record* and materialises the relationships
    into a destination that can represent them natively — a property graph
    (Neo4j) or a nested document store (MongoDB).

**Warehouse path** (staged Parquet + native bulk loader)
    Writes one Parquet file per form to an intermediate staging area and then
    invokes the destination's native bulk-loading command (e.g. Snowflake
    ``COPY INTO``).

All three paths share the contracts defined in this module: ``SinkConfig``,
the ``ExportSink`` ABC, the import-guard helper :func:`_require_optional_dep`,
and the credential-redaction helper :func:`_redact_uri`.

Shared contracts
----------------
* **Batching** – callers split records into batches and call
  :meth:`ExportSink.write_batch` once per batch.  The ``batch_id`` parameter
  is a caller-supplied idempotency key (e.g. ``"<study_key>/<form_key>/<n>"``).
* **Chunk sizing** – ``SinkConfig.batch_size`` controls the number of records
  per batch (default 500).
* **Retries** – sinks must honour ``SinkConfig.max_retries`` and use
  ``SinkConfig.retry_backoff`` as the base delay between attempts.
* **Idempotent writes** – when ``SinkConfig.idempotent`` is ``True`` (default)
  sinks must use upsert semantics or ``CREATE OR REPLACE`` so that replaying a
  batch with the same ``batch_id`` produces no duplicate data.
* **Error propagation** – transient errors are retried; permanent errors raise
  :class:`~imednet.errors.ExportBatchError` (includes ``batch_id``) or
  :class:`~imednet.errors.ExportConfigurationError`.
* **Logging** – sinks use ``logging.getLogger(__name__)`` and must not log
  raw credentials or full URIs.  Pass URIs through :func:`_redact_uri` before
  logging.

Optional dependency conventions
--------------------------------
* Each sink module calls :func:`_require_optional_dep` at connection time (not
  at import time) so that importing the module never fails due to a missing
  optional library.
* Extras keys follow the pattern ``imednet[<key>]``:

  .. code-block:: console

     pip install 'imednet[neo4j]'
     pip install 'imednet[mongodb]'
     pip install 'imednet[snowflake]'

Public-API exposure rules
-------------------------
* :mod:`imednet.integrations` re-exports only the tabular helpers by default
  (backward compatibility).
* The three new sink classes (``Neo4jExportSink``, ``MongoDbExportSink``,
  ``SnowflakeExportSink``) and ``SinkConfig`` are importable from their
  respective submodules and are also re-exported from
  :mod:`imednet.integrations` via explicit names.
* Airflow helpers in :mod:`apache_airflow_providers_imednet.export` wrap only
  the tabular path; graph/document/warehouse sinks are not wrapped there.
"""

from __future__ import annotations

import logging
import re
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from importlib import import_module
from types import TracebackType
from typing import Any, Iterator, Optional, Sequence, Type

logger = logging.getLogger(__name__)

# ---------------------------------------------------------------------------
# Sink configuration
# ---------------------------------------------------------------------------

_DEFAULT_BATCH_SIZE = 500
_DEFAULT_MAX_RETRIES = 3
_DEFAULT_RETRY_BACKOFF = 1.0


[docs]@dataclass class SinkConfig: """Shared configuration for all export sinks. Parameters ---------- batch_size: Number of records per :meth:`ExportSink.write_batch` call. max_retries: Maximum number of retry attempts on transient errors. retry_backoff: Base delay in seconds between retry attempts. The actual delay grows exponentially: ``retry_backoff * 2 ** attempt``. idempotent: When ``True``, sinks use upsert / replace semantics so that replaying a batch with the same ``batch_id`` produces no duplicate data. """ batch_size: int = _DEFAULT_BATCH_SIZE max_retries: int = _DEFAULT_MAX_RETRIES retry_backoff: float = _DEFAULT_RETRY_BACKOFF idempotent: bool = True extra: dict[str, Any] = field(default_factory=dict)
# --------------------------------------------------------------------------- # Abstract base class # ---------------------------------------------------------------------------
[docs]class ExportSink(ABC): """Abstract base class for all export sinks. Subclasses **must** implement :meth:`write_batch`, :meth:`flush`, and :meth:`close`. The context-manager protocol is provided by this class. Parameters ---------- config: Shared sink configuration. Defaults to :class:`SinkConfig` with all values at their defaults. """
[docs] def __init__(self, config: Optional[SinkConfig] = None) -> None: self.config: SinkConfig = config if config is not None else SinkConfig()
# ------------------------------------------------------------------ # Abstract interface # ------------------------------------------------------------------
[docs] @abstractmethod def write_batch(self, records: Sequence[Any], *, batch_id: str) -> int: """Write one batch of records to the destination. Parameters ---------- records: Sequence of records to write. The concrete type depends on the export path: * **Tabular path** – ``pandas.DataFrame`` rows or plain dicts produced by :class:`~imednet_workflows.record_mapper.RecordMapper`. * **Structure-preserving path** – typed :class:`~imednet.models.Record` instances from :class:`~imednet_workflows.data_extraction.DataExtractionWorkflow`. * **Warehouse path** – ``pyarrow.RecordBatch`` or dicts destined for a staged Parquet file. batch_id: Caller-supplied idempotency key. Recommended format: ``"<study_key>/<form_key>/<batch_number>"``. Returns ------- int Number of records successfully written. Raises ------ ~imednet.errors.ExportBatchError When the batch cannot be written after all retries. """ ...
[docs] @abstractmethod def flush(self) -> None: """Flush any internal buffers to the destination. Raises ------ ~imednet.errors.ExportError On flush failure. """ ...
[docs] @abstractmethod def close(self) -> None: """Release all resources held by this sink (connections, file handles). Implementations must be idempotent — calling ``close()`` on an already closed sink must not raise. """ ...
# ------------------------------------------------------------------ # Context-manager support # ------------------------------------------------------------------ def __enter__(self) -> "ExportSink": return self def __exit__( self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType], ) -> None: try: if exc_type is None: self.flush() finally: self.close()
# --------------------------------------------------------------------------- # Shared helpers # --------------------------------------------------------------------------- # Regex that matches the user-info component of a URI # e.g. mongodb://user:pass@host -> mongodb://***@host _URI_USERINFO_RE = re.compile(r"(://)[^@/]+@") def _redact_uri(uri: str) -> str: """Replace user-info in *uri* with ``***`` to prevent credential leakage. Examples -------- >>> _redact_uri("mongodb://user:pass@localhost:27017/db") 'mongodb://***@localhost:27017/db' >>> _redact_uri("neo4j+s://bolt.example.com") 'neo4j+s://bolt.example.com' """ return _URI_USERINFO_RE.sub(r"\1***@", uri) def _require_optional_dep(package: str, extras_key: str) -> Any: """Import *package* or raise :class:`ImportError` with a helpful message. Parameters ---------- package: Top-level package name to import (e.g. ``"neo4j"``). extras_key: The ``imednet`` extras key that installs the dependency (e.g. ``"neo4j"``). Returns ------- types.ModuleType The imported module. Raises ------ ImportError When *package* is not installed. """ try: return import_module(package) except ModuleNotFoundError as error: if error.name and error.name.startswith(package.split(".")[0]): raise ImportError( f"This export sink requires the optional '{package}' package. " f"Install with `pip install 'imednet[{extras_key}]'`." ) from error raise
[docs]def iter_batches(records: Sequence[Any], batch_size: int) -> Iterator[Sequence[Any]]: """Yield ``records`` in chunks of ``batch_size``.""" if batch_size <= 0: raise ValueError("batch_size must be greater than 0") for start in range(0, len(records), batch_size): yield records[start : start + batch_size]
__all__ = [ "SinkConfig", "ExportSink", "iter_batches", "_redact_uri", "_require_optional_dep", ]