from __future__ import annotations
import logging
from collections.abc import Iterable, Iterator
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Type, Union
try:
import pandas as pd
except ImportError:
pd = None # type: ignore
from pydantic import BaseModel, Field, ValidationError, create_model
from imednet.endpoints.records import Record as RecordModel # type: ignore[attr-defined]
from imednet.endpoints.variables import Variable as VariableModel # type: ignore[attr-defined]
from .cached_loader import CachedRecordsLoader
from .chunked_pipeline import DEFAULT_CHUNK_SIZE, ChunkedRecordPipeline, iter_chunks
if TYPE_CHECKING:
from imednet.sdk import ImednetSDK
# Setup basic logging
logger = logging.getLogger(__name__)
[docs]class RecordMapper:
"""
Maps EDC records for a study into a pandas DataFrame.
Features:
- Fetches variable definitions for column mapping.
- Dynamically creates a Pydantic model for type validation of record data.
- Fetches records, applying server-side filtering where possible.
- Merges metadata and record data.
- Offers choice between variable names or labels for column headers.
- Handles parsing errors gracefully for individual records.
Example:
sdk = ImednetSDK(api_key, security_key, base_url)
mapper = RecordMapper(sdk)
# Get DataFrame with labels as columns, filtered by visit
df_labels = mapper.dataframe(study_key="MYSTUDY", visit_key="VISIT1")
# Get DataFrame with variable names as columns
df_names = mapper.dataframe(study_key="MYSTUDY", use_labels_as_columns=False)
"""
[docs] def __init__(
self,
sdk: "ImednetSDK",
*,
loader: CachedRecordsLoader | None = None,
chunk_size: int = DEFAULT_CHUNK_SIZE,
) -> None:
"""Initialize with an :class:`ImednetSDK` instance.
``loader`` enables cache-backed streaming for large-study workflows.
``chunk_size`` controls the maximum number of records parsed into each
yielded batch when using chunked mapping helpers.
"""
self.sdk = sdk
self._loader = loader
self._pipeline = ChunkedRecordPipeline(chunk_size=chunk_size)
# ------------------------------------------------------------------
# Helper methods
# ------------------------------------------------------------------
def _fetch_variable_metadata(
self,
study_key: str,
variable_whitelist: Optional[List[str]] = None,
form_whitelist: Optional[List[int]] = None,
) -> Tuple[List[str], Dict[str, str]]:
"""Return variable names and label mapping for a study."""
filters: Dict[str, Any] = {}
if variable_whitelist is not None:
filters["variableNames"] = variable_whitelist
if form_whitelist is not None:
filters["formIds"] = form_whitelist
variables: List[VariableModel] = self.sdk.variables.list(
study_key=study_key,
**filters,
)
if not variables:
logger.warning(
"No variables found for study '%s'. Returning empty DataFrame.",
study_key,
)
return [], {}
variable_keys = [v.variable_name for v in variables]
label_map = {v.variable_name: v.label for v in variables}
return variable_keys, label_map
def _build_record_model(
self, variable_keys: List[str], label_map: Dict[str, str]
) -> Type[BaseModel]:
"""Create a dynamic model for the record data payload."""
fields: Dict[str, Tuple[Optional[Any], Any]] = {}
for key in variable_keys:
fields[key] = (
Optional[Any],
Field(None, alias=key, description=label_map.get(key, key)),
)
return create_model("RecordData", __base__=BaseModel, **fields) # type: ignore
def _fetch_records(
self,
study_key: str,
visit_key: Optional[str] = None,
extra_filters: Optional[Dict[str, Union[Any, Tuple[str, Any], List[Any]]]] = None,
) -> List[RecordModel]:
"""Fetch records for a study applying optional filters."""
filters: Dict[str, Union[Any, Tuple[str, Any], List[Any]]] = (
dict(extra_filters) if extra_filters else {}
)
if visit_key is not None:
try:
filters["visitId"] = int(visit_key)
except ValueError:
logger.warning(
"Invalid visit_key '%s'. Should be convertible to int. Fetching all records.",
visit_key,
)
try:
return self.sdk.records.list(
study_key=study_key,
record_data_filter=None,
**filters,
)
except Exception as exc: # pragma: no cover - unexpected
logger.error("Failed to fetch records for study '%s': %s", study_key, exc)
return []
def _iter_records(
self,
study_key: str,
visit_key: Optional[str] = None,
extra_filters: Optional[Dict[str, Union[Any, Tuple[str, Any], List[Any]]]] = None,
) -> Iterable[RecordModel]:
form_ids: set[Any] | None = None
filters = dict(extra_filters) if extra_filters else {}
if "formIds" in filters and isinstance(filters["formIds"], list):
form_ids = set(filters["formIds"])
elif "formId" in filters:
form_ids = {filters["formId"]}
loader = self._loader
if loader is not None:
sync_method = getattr(loader, "sync_records", None)
iter_method = getattr(type(loader), "iter_cached_records", None)
if callable(sync_method) and callable(iter_method):
sync_method(study_key)
return self._filter_records(
iter_method(loader, study_key, chunk_size=self._pipeline.chunk_size),
visit_key=visit_key,
form_ids=form_ids,
)
return self._fetch_records(
study_key,
visit_key,
extra_filters=extra_filters,
)
def _filter_records(
self,
records: Iterable[RecordModel],
*,
visit_key: Optional[str],
form_ids: set[Any] | None,
) -> Iterator[RecordModel]:
visit_id: int | None = None
if visit_key is not None:
try:
visit_id = int(visit_key)
except ValueError:
logger.warning(
"Invalid visit_key '%s'. Should be convertible to int. Fetching all records.",
visit_key,
)
for record in records:
if visit_id is not None and record.visit_id != visit_id:
continue
if form_ids is not None and record.form_id not in form_ids:
continue
yield record
def _parse_record(
self,
rec: RecordModel,
record_model: Type[BaseModel],
) -> Dict[str, Any]:
meta = {
"recordId": rec.record_id,
"subjectKey": rec.subject_key,
"visitId": rec.visit_id,
"formId": rec.form_id,
"recordStatus": rec.record_status,
"dateCreated": rec.date_created.isoformat() if rec.date_created else None,
}
data = rec.record_data if isinstance(rec.record_data, dict) else {}
parsed = record_model(**data).model_dump(by_alias=False)
return {**meta, **parsed}
def _parse_records(
self, records: Iterable[RecordModel], record_model: Type[BaseModel]
) -> Tuple[List[Dict[str, Any]], int]:
"""Parse raw records into row dictionaries and count failures."""
rows: List[Dict[str, Any]] = []
errors = 0
for chunk_rows, chunk_errors in self._iter_parsed_rows(records, record_model):
rows.extend(chunk_rows)
errors += chunk_errors
return rows, errors
def _iter_parsed_rows(
self,
records: Iterable[RecordModel],
record_model: Type[BaseModel],
) -> Iterator[Tuple[List[Dict[str, Any]], int]]:
for chunk in iter_chunks(records, chunk_size=self._pipeline.chunk_size):
rows: List[Dict[str, Any]] = []
errors = 0
for rec in chunk:
try:
rows.append(self._parse_record(rec, record_model))
except (ValidationError, TypeError) as exc:
errors += 1
logger.warning(
"Failed to parse record data for recordId %s: %s",
rec.record_id,
exc,
)
except Exception as exc: # pragma: no cover - unexpected
errors += 1
logger.error("Unexpected error processing recordId %s: %s", rec.record_id, exc)
yield rows, errors
def _build_dataframe(
self,
rows: List[Dict[str, Any]],
variable_keys: List[str],
label_map: Dict[str, str],
use_labels: bool,
) -> pd.DataFrame:
"""Create the output DataFrame from parsed rows."""
df = pd.DataFrame(rows)
if df.empty:
return df
meta_cols = [
"recordId",
"subjectKey",
"visitId",
"formId",
"recordStatus",
"dateCreated",
]
for key in variable_keys:
if key not in df.columns:
df[key] = pd.NA
df = df[meta_cols + variable_keys]
if use_labels:
rename_map = {key: label_map.get(key, key) for key in variable_keys}
df = df.rename(columns=rename_map)
return df
[docs] def iter_dataframes(
self,
study_key: str,
visit_key: Optional[str] = None,
use_labels_as_columns: bool = True,
variable_whitelist: Optional[List[str]] = None,
form_whitelist: Optional[List[int]] = None,
) -> Iterator[pd.DataFrame]:
"""Yield mapped record DataFrames in bounded chunks.
Each yielded frame contains at most ``chunk_size`` mapped records from
this mapper instance. Prefer this method over ``dataframe()`` when
processing or exporting large studies so each chunk can be committed
before the next batch is parsed.
"""
if pd is None:
raise ImportError(
(
"pandas is required for RecordMapper.dataframe. Install "
"with 'pip install \"imednet[pandas]\"'."
)
)
variable_keys, label_map = self._fetch_variable_metadata(
study_key,
variable_whitelist=variable_whitelist,
form_whitelist=form_whitelist,
)
if not variable_keys:
return
record_model = self._build_record_model(variable_keys, label_map)
extra_filters: Dict[str, Any] = {}
if variable_whitelist is not None:
extra_filters["variableNames"] = variable_whitelist
if form_whitelist is not None:
extra_filters["formIds"] = form_whitelist
errors = 0
yielded = False
for rows, chunk_errors in self._iter_parsed_rows(
self._iter_records(
study_key,
visit_key,
extra_filters=extra_filters or None,
),
record_model,
):
errors += chunk_errors
df = self._build_dataframe(rows, variable_keys, label_map, use_labels_as_columns)
if df.empty:
continue
yielded = True
yield df
if errors:
logger.warning("Encountered %s errors while parsing record data.", errors)
if not yielded:
logger.info(
"No records processed successfully for study '%s' with the given filters.",
study_key,
)
[docs] def dataframe(
self,
study_key: str,
visit_key: Optional[str] = None,
use_labels_as_columns: bool = True,
variable_whitelist: Optional[List[str]] = None,
form_whitelist: Optional[List[int]] = None,
) -> pd.DataFrame:
"""Return a :class:`pandas.DataFrame` of records for a study.
This method still materialises all yielded chunks into one DataFrame.
For bounded-memory processing of large studies, prefer ``iter_dataframes()``.
"""
if pd is None:
raise ImportError(
(
"pandas is required for RecordMapper.dataframe. Install "
"with 'pip install \"imednet[pandas]\"'."
)
)
frames = list(
self.iter_dataframes(
study_key,
visit_key=visit_key,
use_labels_as_columns=use_labels_as_columns,
variable_whitelist=variable_whitelist,
form_whitelist=form_whitelist,
)
)
if not frames:
return pd.DataFrame()
return pd.concat(frames, ignore_index=True)