"""Shared base classes and helpers for all export sinks.
Architecture decision
---------------------
The SDK provides three export paths:
**Tabular path** (``RecordMapper`` + ``pandas.DataFrame``)
Flattens record data into a wide DataFrame and writes it to CSV, Excel,
JSON, SQL, DuckDB, or Parquet. All functions in
:mod:`imednet.integrations.export` follow this path.
**Structure-preserving path** (``DataExtractionWorkflow`` + typed records)
Traverses the clinical data hierarchy
*Study → Subject → Visit → Record* and materialises the relationships
into a destination that can represent them natively — a property graph
(Neo4j) or a nested document store (MongoDB).
**Warehouse path** (staged Parquet + native bulk loader)
Writes one Parquet file per form to an intermediate staging area and then
invokes the destination's native bulk-loading command (e.g. Snowflake
``COPY INTO``).
All three paths share the contracts defined in this module: ``SinkConfig``,
the ``ExportSink`` ABC, the import-guard helper :func:`_require_optional_dep`,
and the credential-redaction helper :func:`_redact_uri`.
Shared contracts
----------------
* **Batching** – callers split records into batches and call
:meth:`ExportSink.write_batch` once per batch. The ``batch_id`` parameter
is a caller-supplied idempotency key (e.g. ``"<study_key>/<form_key>/<n>"``).
* **Chunk sizing** – ``SinkConfig.batch_size`` controls the number of records
per batch (default 500).
* **Retries** – sinks must honour ``SinkConfig.max_retries`` and use
``SinkConfig.retry_backoff`` as the base delay between attempts.
* **Idempotent writes** – when ``SinkConfig.idempotent`` is ``True`` (default)
sinks must use upsert semantics or ``CREATE OR REPLACE`` so that replaying a
batch with the same ``batch_id`` produces no duplicate data.
* **Error propagation** – transient errors are retried; permanent errors raise
:class:`~imednet.errors.ExportBatchError` (includes ``batch_id``) or
:class:`~imednet.errors.ExportConfigurationError`.
* **Logging** – sinks use ``logging.getLogger(__name__)`` and must not log
raw credentials or full URIs. Pass URIs through :func:`_redact_uri` before
logging.
Optional dependency conventions
--------------------------------
* Each sink module calls :func:`_require_optional_dep` at connection time (not
at import time) so that importing the module never fails due to a missing
optional library.
* Extras keys follow the pattern ``imednet[<key>]``:
.. code-block:: console
pip install 'imednet[neo4j]'
pip install 'imednet[mongodb]'
pip install 'imednet[snowflake]'
Public-API exposure rules
-------------------------
* :mod:`imednet.integrations` re-exports only the tabular helpers by default
(backward compatibility).
* The three new sink classes (``Neo4jExportSink``, ``MongoDbExportSink``,
``SnowflakeExportSink``) and ``SinkConfig`` are importable from their
respective submodules and are also re-exported from
:mod:`imednet.integrations` via explicit names.
* Airflow helpers in :mod:`apache_airflow_providers_imednet.export` wrap only
the tabular path; graph/document/warehouse sinks are not wrapped there.
"""
from __future__ import annotations
import logging
import re
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from importlib import import_module
from types import TracebackType
from typing import Any, Iterator, Optional, Sequence, Type
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Sink configuration
# ---------------------------------------------------------------------------
_DEFAULT_BATCH_SIZE = 500
_DEFAULT_MAX_RETRIES = 3
_DEFAULT_RETRY_BACKOFF = 1.0
[docs]@dataclass
class SinkConfig:
"""Shared configuration for all export sinks.
Parameters
----------
batch_size:
Number of records per :meth:`ExportSink.write_batch` call.
max_retries:
Maximum number of retry attempts on transient errors.
retry_backoff:
Base delay in seconds between retry attempts. The actual delay grows
exponentially: ``retry_backoff * 2 ** attempt``.
idempotent:
When ``True``, sinks use upsert / replace semantics so that replaying
a batch with the same ``batch_id`` produces no duplicate data.
"""
batch_size: int = _DEFAULT_BATCH_SIZE
max_retries: int = _DEFAULT_MAX_RETRIES
retry_backoff: float = _DEFAULT_RETRY_BACKOFF
idempotent: bool = True
extra: dict[str, Any] = field(default_factory=dict)
# ---------------------------------------------------------------------------
# Abstract base class
# ---------------------------------------------------------------------------
[docs]class ExportSink(ABC):
"""Abstract base class for all export sinks.
Subclasses **must** implement :meth:`write_batch`, :meth:`flush`, and
:meth:`close`. The context-manager protocol is provided by this class.
Parameters
----------
config:
Shared sink configuration. Defaults to :class:`SinkConfig` with
all values at their defaults.
"""
[docs] def __init__(self, config: Optional[SinkConfig] = None) -> None:
self.config: SinkConfig = config if config is not None else SinkConfig()
# ------------------------------------------------------------------
# Abstract interface
# ------------------------------------------------------------------
[docs] @abstractmethod
def write_batch(self, records: Sequence[Any], *, batch_id: str) -> int:
"""Write one batch of records to the destination.
Parameters
----------
records:
Sequence of records to write. The concrete type depends on the
export path:
* **Tabular path** – ``pandas.DataFrame`` rows or plain dicts
produced by :class:`~imednet_workflows.record_mapper.RecordMapper`.
* **Structure-preserving path** – typed
:class:`~imednet.models.Record` instances from
:class:`~imednet_workflows.data_extraction.DataExtractionWorkflow`.
* **Warehouse path** – ``pyarrow.RecordBatch`` or dicts destined
for a staged Parquet file.
batch_id:
Caller-supplied idempotency key. Recommended format:
``"<study_key>/<form_key>/<batch_number>"``.
Returns
-------
int
Number of records successfully written.
Raises
------
~imednet.errors.ExportBatchError
When the batch cannot be written after all retries.
"""
...
[docs] @abstractmethod
def flush(self) -> None:
"""Flush any internal buffers to the destination.
Raises
------
~imednet.errors.ExportError
On flush failure.
"""
...
[docs] @abstractmethod
def close(self) -> None:
"""Release all resources held by this sink (connections, file handles).
Implementations must be idempotent — calling ``close()`` on an already
closed sink must not raise.
"""
...
# ------------------------------------------------------------------
# Context-manager support
# ------------------------------------------------------------------
def __enter__(self) -> "ExportSink":
return self
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
try:
if exc_type is None:
self.flush()
finally:
self.close()
# ---------------------------------------------------------------------------
# Shared helpers
# ---------------------------------------------------------------------------
# Regex that matches the user-info component of a URI
# e.g. mongodb://user:pass@host -> mongodb://***@host
_URI_USERINFO_RE = re.compile(r"(://)[^@/]+@")
def _redact_uri(uri: str) -> str:
"""Replace user-info in *uri* with ``***`` to prevent credential leakage.
Examples
--------
>>> _redact_uri("mongodb://user:pass@localhost:27017/db")
'mongodb://***@localhost:27017/db'
>>> _redact_uri("neo4j+s://bolt.example.com")
'neo4j+s://bolt.example.com'
"""
return _URI_USERINFO_RE.sub(r"\1***@", uri)
def _require_optional_dep(package: str, extras_key: str) -> Any:
"""Import *package* or raise :class:`ImportError` with a helpful message.
Parameters
----------
package:
Top-level package name to import (e.g. ``"neo4j"``).
extras_key:
The ``imednet`` extras key that installs the dependency
(e.g. ``"neo4j"``).
Returns
-------
types.ModuleType
The imported module.
Raises
------
ImportError
When *package* is not installed.
"""
try:
return import_module(package)
except ModuleNotFoundError as error:
if error.name and error.name.startswith(package.split(".")[0]):
raise ImportError(
f"This export sink requires the optional '{package}' package. "
f"Install with `pip install 'imednet[{extras_key}]'`."
) from error
raise
[docs]def iter_batches(records: Sequence[Any], batch_size: int) -> Iterator[Sequence[Any]]:
"""Yield ``records`` in chunks of ``batch_size``."""
if batch_size <= 0:
raise ValueError("batch_size must be greater than 0")
for start in range(0, len(records), batch_size):
yield records[start : start + batch_size]
__all__ = [
"SinkConfig",
"ExportSink",
"iter_batches",
"_redact_uri",
"_require_optional_dep",
]