"""DuckDB ingestion workflow for incremental eCRF centralization."""
from __future__ import annotations
from datetime import datetime
from typing import TYPE_CHECKING, Any, Iterable, Literal, Optional
from .data_extraction import DataExtractionWorkflow
if TYPE_CHECKING:
from imednet.models import RecordRevision
from imednet.sdk import ImednetSDK
[docs]class DuckDBIngestionWorkflow:
"""Incremental eCRF centralization pipeline using a bronze/silver DuckDB layout."""
[docs] def __init__(self, sdk: "ImednetSDK", db_path: str) -> None:
try:
import duckdb
except ImportError as error:
raise ImportError(
"DuckDB ingestion requires the optional 'duckdb' dependency. "
"Install with `pip install 'imednet[duckdb]'`."
) from error
self._connection: Any = duckdb.connect(db_path)
self._extractor = DataExtractionWorkflow(sdk)
[docs] def ingest_revisions(
self,
study_key: str,
*,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
mode: Literal["append", "replace"] = "append",
) -> int:
"""Fetch RecordRevisions and write to the bronze_revisions table."""
if mode not in {"append", "replace"}:
raise ValueError("mode must be one of: 'append', 'replace'")
revisions = self._extractor.extract_audit_trail(
study_key,
start_date=start_date,
end_date=end_date,
)
if mode == "replace":
self._connection.execute("DROP TABLE IF EXISTS bronze_revisions")
self._ensure_bronze_table()
rows = list(self._to_rows(study_key, revisions))
if rows:
self._connection.executemany(
"""
INSERT INTO bronze_revisions (
study_key,
record_id,
form_id,
variable_name,
value,
revision_number,
date_modified,
modified_by
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
rows,
)
return len(rows)
[docs] def build_silver_view(self, study_key: str) -> None:
"""Create or replace the silver_current_state view."""
self._ensure_bronze_table()
escaped_study_key = study_key.replace("'", "''")
self._connection.execute(f"""
CREATE OR REPLACE VIEW silver_current_state AS
SELECT
study_key,
record_id,
form_id,
variable_name,
value,
revision_number,
date_modified,
modified_by
FROM bronze_revisions
WHERE study_key = '{escaped_study_key}'
QUALIFY ROW_NUMBER() OVER (
PARTITION BY record_id, variable_name
ORDER BY revision_number DESC
) = 1
""")
def _ensure_bronze_table(self) -> None:
self._connection.execute("""
CREATE TABLE IF NOT EXISTS bronze_revisions (
study_key VARCHAR,
record_id INTEGER,
form_id INTEGER,
variable_name VARCHAR,
value VARCHAR,
revision_number INTEGER,
date_modified TIMESTAMP,
modified_by VARCHAR
)
""")
def _to_rows(
self, study_key: str, revisions: Iterable["RecordRevision"]
) -> Iterable[tuple[str, int, int, str, str, int, Optional[datetime], str]]:
for revision in revisions:
value = getattr(revision, "value", "")
date_modified = getattr(
revision, "date_modified", getattr(revision, "date_created", None)
)
revision_number = getattr(
revision,
"revision_number",
getattr(revision, "record_revision", 0),
)
yield (
study_key,
int(getattr(revision, "record_id", 0) or 0),
int(getattr(revision, "form_id", 0) or 0),
str(getattr(revision, "variable_name", "")),
"" if value is None else str(value),
int(revision_number or 0),
date_modified,
str(getattr(revision, "modified_by", getattr(revision, "user", ""))),
)