Source code for imednet_workflows.duckdb_centralizer

"""DuckDB ingestion workflow for incremental eCRF centralization."""

from __future__ import annotations

from datetime import datetime
from typing import TYPE_CHECKING, Any, Iterable, Literal, Optional

from .data_extraction import DataExtractionWorkflow

if TYPE_CHECKING:
    from imednet.models import RecordRevision
    from imednet.sdk import ImednetSDK


[docs]class DuckDBIngestionWorkflow: """Incremental eCRF centralization pipeline using a bronze/silver DuckDB layout."""
[docs] def __init__(self, sdk: "ImednetSDK", db_path: str) -> None: try: import duckdb except ImportError as error: raise ImportError( "DuckDB ingestion requires the optional 'duckdb' dependency. " "Install with `pip install 'imednet[duckdb]'`." ) from error self._connection: Any = duckdb.connect(db_path) self._extractor = DataExtractionWorkflow(sdk)
[docs] def ingest_revisions( self, study_key: str, *, start_date: Optional[str] = None, end_date: Optional[str] = None, mode: Literal["append", "replace"] = "append", ) -> int: """Fetch RecordRevisions and write to the bronze_revisions table.""" if mode not in {"append", "replace"}: raise ValueError("mode must be one of: 'append', 'replace'") revisions = self._extractor.extract_audit_trail( study_key, start_date=start_date, end_date=end_date, ) if mode == "replace": self._connection.execute("DROP TABLE IF EXISTS bronze_revisions") self._ensure_bronze_table() rows = list(self._to_rows(study_key, revisions)) if rows: self._connection.executemany( """ INSERT INTO bronze_revisions ( study_key, record_id, form_id, variable_name, value, revision_number, date_modified, modified_by ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, rows, ) return len(rows)
[docs] def build_silver_view(self, study_key: str) -> None: """Create or replace the silver_current_state view.""" self._ensure_bronze_table() escaped_study_key = study_key.replace("'", "''") self._connection.execute(f""" CREATE OR REPLACE VIEW silver_current_state AS SELECT study_key, record_id, form_id, variable_name, value, revision_number, date_modified, modified_by FROM bronze_revisions WHERE study_key = '{escaped_study_key}' QUALIFY ROW_NUMBER() OVER ( PARTITION BY record_id, variable_name ORDER BY revision_number DESC ) = 1 """)
def _ensure_bronze_table(self) -> None: self._connection.execute(""" CREATE TABLE IF NOT EXISTS bronze_revisions ( study_key VARCHAR, record_id INTEGER, form_id INTEGER, variable_name VARCHAR, value VARCHAR, revision_number INTEGER, date_modified TIMESTAMP, modified_by VARCHAR ) """) def _to_rows( self, study_key: str, revisions: Iterable["RecordRevision"] ) -> Iterable[tuple[str, int, int, str, str, int, Optional[datetime], str]]: for revision in revisions: value = getattr(revision, "value", "") date_modified = getattr( revision, "date_modified", getattr(revision, "date_created", None) ) revision_number = getattr( revision, "revision_number", getattr(revision, "record_revision", 0), ) yield ( study_key, int(getattr(revision, "record_id", 0) or 0), int(getattr(revision, "form_id", 0) or 0), str(getattr(revision, "variable_name", "")), "" if value is None else str(value), int(revision_number or 0), date_modified, str(getattr(revision, "modified_by", getattr(revision, "user", ""))), )