"""Export helpers built on top of :class:`RecordMapper`."""
from __future__ import annotations
from importlib import import_module
from typing import Any, List, Optional
try:
import pandas as pd
except ImportError:
pd = None # type: ignore
from imednet.constants import MAX_SQLITE_COLUMNS
from imednet.utils import sanitize_csv_formula
from .. import ImednetClient
from ..sdk import ImednetSDK
def _quote_duckdb_identifier(name: str) -> str:
escaped_name = name.replace('"', '""')
return f'"{escaped_name}"'
_DUCKDB_DF_ALIAS = "df"
def _record_mapper() -> Any:
try:
return import_module("imednet_workflows.record_mapper").RecordMapper
except ModuleNotFoundError as error:
if error.name and error.name.startswith("imednet_workflows"):
raise ImportError(
"Record export requires the optional 'imednet-workflows' package. "
"Install with `pip install imednet-workflows`."
) from error
raise
def _to_sql_with_chunking(
df: pd.DataFrame,
table: str,
engine: Any,
*,
if_exists: str,
**kwargs: Any,
) -> None:
"""Write ``df`` to ``table`` splitting columns when using SQLite.
SQLite limits tables to ``MAX_SQLITE_COLUMNS`` columns. When the DataFrame
exceeds this, the data is written to multiple tables suffixed with
``_part1``, ``_part2`` and so on.
"""
if engine.dialect.name == "sqlite" and len(df.columns) > MAX_SQLITE_COLUMNS:
for i, start in enumerate(range(0, len(df.columns), MAX_SQLITE_COLUMNS), start=1):
chunk = df.iloc[:, start : start + MAX_SQLITE_COLUMNS]
chunk.to_sql(
f"{table}_part{i}",
engine,
if_exists=if_exists, # type: ignore[arg-type]
index=False,
**kwargs,
)
else:
df.to_sql(table, engine, if_exists=if_exists, index=False, **kwargs) # type: ignore[arg-type]
def _records_df(
sdk: ImednetSDK,
study_key: str,
*,
use_labels_as_columns: bool = False,
variable_whitelist: Optional[List[str]] = None,
form_whitelist: Optional[List[int]] = None,
) -> pd.DataFrame:
"""Return a DataFrame of study records with duplicate columns removed."""
if pd is None:
raise ImportError(
(
"pandas is required for _records_df. Install with "
"'pip install pandas imednet-workflows'."
)
)
df: pd.DataFrame = _record_mapper()(sdk).dataframe(
study_key,
use_labels_as_columns=use_labels_as_columns,
variable_whitelist=variable_whitelist,
form_whitelist=form_whitelist,
)
if isinstance(df, pd.DataFrame):
df.columns = df.columns.astype(str)
df = df.loc[:, ~df.columns.str.lower().duplicated()]
return df
def _prepare_export_df(
sdk: ImednetSDK,
study_key: str,
*,
use_labels_as_columns: bool = False,
variable_whitelist: Optional[List[str]] = None,
form_whitelist: Optional[List[int]] = None,
sanitize: bool = False,
) -> pd.DataFrame:
"""Prepare a DataFrame for export by fetching records and optionally sanitizing."""
df = _records_df(
sdk,
study_key,
use_labels_as_columns=use_labels_as_columns,
variable_whitelist=variable_whitelist,
form_whitelist=form_whitelist,
)
if sanitize:
df = _sanitize_df(df)
return df
[docs]def export_to_parquet(
sdk: ImednetSDK,
study_key: str,
path: str,
*,
use_labels_as_columns: bool = False,
**kwargs: Any,
) -> None:
"""Export study records to a Parquet file.
Parameters
----------
use_labels_as_columns:
When ``True``, variable labels are used for column names instead of
variable names.
"""
df = _prepare_export_df(
sdk,
study_key,
use_labels_as_columns=use_labels_as_columns,
)
df.to_parquet(path, index=False, **kwargs)
def _sanitize_df(df: pd.DataFrame) -> pd.DataFrame:
"""Sanitize DataFrame string columns to prevent CSV injection."""
# Explicitly include "str" to avoid Pandas 3.0 warning about object dtype
# including strings implicitly. Use a try/except block to handle older Pandas versions
# (e.g. 2.3.3) where "str" raises a TypeError.
try:
cols = df.select_dtypes(include=[object, "str"])
except TypeError:
cols = df.select_dtypes(include=[object])
for col in cols:
df[col] = df[col].apply(sanitize_csv_formula) # type: ignore[call-overload]
return df
[docs]def export_to_csv(
sdk: ImednetSDK,
study_key: str,
path: str,
*,
use_labels_as_columns: bool = False,
**kwargs: Any,
) -> None:
"""Export study records to a CSV file.
Parameters
----------
use_labels_as_columns:
When ``True``, variable labels are used for column names instead of
variable names.
"""
df = _prepare_export_df(
sdk,
study_key,
use_labels_as_columns=use_labels_as_columns,
sanitize=True,
)
df.to_csv(path, index=False, **kwargs)
[docs]def export_to_excel(
sdk: ImednetSDK,
study_key: str,
path: str,
*,
use_labels_as_columns: bool = False,
**kwargs: Any,
) -> None:
"""Export study records to an Excel workbook.
Parameters
----------
use_labels_as_columns:
When ``True``, variable labels are used for column names instead of
variable names.
"""
df = _prepare_export_df(
sdk,
study_key,
use_labels_as_columns=use_labels_as_columns,
sanitize=True,
)
df.to_excel(path, index=False, **kwargs)
[docs]def export_to_json(
sdk: ImednetSDK,
study_key: str,
path: str,
*,
use_labels_as_columns: bool = False,
**kwargs: Any,
) -> None:
"""Export study records to a JSON file.
Parameters
----------
use_labels_as_columns:
When ``True``, variable labels are used for column names instead of
variable names.
"""
df = _prepare_export_df(
sdk,
study_key,
use_labels_as_columns=use_labels_as_columns,
)
df.to_json(path, index=False, **kwargs)
[docs]def export_to_sql(
sdk: ImednetSDK,
study_key: str,
table: str,
conn_str: str,
if_exists: str = "replace",
*,
use_labels_as_columns: bool = False,
variable_whitelist: Optional[List[str]] = None,
form_whitelist: Optional[List[int]] = None,
**kwargs: Any,
) -> None:
"""Export study records to a SQL table.
Parameters
----------
use_labels_as_columns:
When ``True``, variable labels are used for column names instead of
variable names.
"""
from sqlalchemy import create_engine
df = _prepare_export_df(
sdk,
study_key,
use_labels_as_columns=use_labels_as_columns,
variable_whitelist=variable_whitelist,
form_whitelist=form_whitelist,
)
engine = create_engine(conn_str)
_to_sql_with_chunking(
df,
table,
engine,
if_exists=if_exists,
**kwargs,
) # type: ignore[arg-type]
[docs]def export_to_duckdb(
sdk: ImednetSDK,
study_key: str,
db_path: str,
table_name: str,
*,
use_labels_as_columns: bool = False,
variable_whitelist: Optional[List[str]] = None,
form_whitelist: Optional[List[int]] = None,
) -> None:
"""Export study records to a DuckDB table using native DataFrame registration.
Parameters
----------
sdk:
Authenticated SDK instance used to fetch study records.
study_key:
Study identifier to export.
db_path:
Path to the target ``.duckdb`` database file.
table_name:
Name of the destination DuckDB table.
use_labels_as_columns:
When ``True``, variable labels are used for DataFrame column names.
variable_whitelist:
Optional list of variable names to include.
form_whitelist:
Optional list of form IDs to include.
Raises
------
ImportError
If the optional ``duckdb`` dependency is not installed.
"""
try:
import duckdb
except ImportError as error:
raise ImportError(
"DuckDB export requires the optional 'duckdb' dependency. "
"Install with `pip install 'imednet[duckdb]'`."
) from error
df = _prepare_export_df(
sdk,
study_key,
use_labels_as_columns=use_labels_as_columns,
variable_whitelist=variable_whitelist,
form_whitelist=form_whitelist,
)
conn: Any = duckdb.connect(db_path)
try:
conn.register(_DUCKDB_DF_ALIAS, df)
conn.execute(
f"CREATE OR REPLACE TABLE {_quote_duckdb_identifier(table_name)} "
f"AS SELECT * FROM {_quote_duckdb_identifier(_DUCKDB_DF_ALIAS)}"
)
conn.unregister(_DUCKDB_DF_ALIAS)
finally:
conn.close()
[docs]def export_to_long_sql(
sdk: ImednetClient,
study_key: str,
table_name: str,
conn_str: str,
*,
chunk_size: int = 1000,
) -> None:
"""Export records to a normalized long-format SQL table."""
if pd is None:
raise ImportError(
(
"pandas is required for export_to_long_sql. Install with "
"'pip install pandas imednet-workflows'."
)
)
from sqlalchemy import create_engine
engine = create_engine(conn_str)
mapper = _record_mapper()(sdk)
records = mapper._fetch_records(study_key)
rows: List[dict[str, Any]] = []
first = True
for rec in records:
timestamp = rec.date_modified
for name, value in (rec.record_data or {}).items():
rows.append(
{
"record_id": rec.record_id,
"form_id": rec.form_id,
"variable_name": name,
"value": value,
"timestamp": timestamp,
}
)
if len(rows) >= chunk_size:
df = pd.DataFrame(rows)
df.to_sql(
table_name,
engine,
if_exists="replace" if first else "append",
index=False,
)
rows = []
first = False
if rows:
df = pd.DataFrame(rows)
df.to_sql(
table_name,
engine,
if_exists="replace" if first else "append",
index=False,
)