Source code for imednet.workflows.record_update

"""Utilities for submitting and updating records in iMedNet studies."""

import warnings
from typing import TYPE_CHECKING, Any, Dict, List, Literal, Union, cast

from ..models import Job
from ..validation.cache import AsyncSchemaValidator, SchemaCache, SchemaValidator
from .job_poller import JobPoller

if TYPE_CHECKING:
    from ..sdk import ImednetSDK


[docs]class RecordUpdateWorkflow: """ Provides workflows for creating or updating records, including batch submission and optional job status monitoring. Args: sdk: An instance of the ImednetSDK. """ def __init__(self, sdk: "ImednetSDK"): self._sdk = sdk self._is_async = getattr(sdk, "_async_client", None) is not None if self._is_async: self._validator = AsyncSchemaValidator(sdk) # type: ignore[arg-type] else: self._validator = SchemaValidator(sdk) # type: ignore[assignment] self._schema: SchemaCache = cast(SchemaCache, self._validator.schema) def _validate_form_key(self, study_key: str, records_data: List[Dict[str, Any]]) -> None: """Refresh schema if the form key is missing from cache.""" if not records_data: return first_ref = records_data[0].get("formKey") or self._schema.form_key_from_id( records_data[0].get("formId", 0) ) if first_ref and not self._schema.variables_for_form(first_ref): cast(SchemaValidator, self._validator).refresh(study_key) if first_ref not in self._schema.forms: raise ValueError(f"Form key '{first_ref}' not found") async def _async_validate_form_key( self, study_key: str, records_data: List[Dict[str, Any]] ) -> None: """Asynchronous variant of :meth:`_validate_form_key`.""" if not records_data: return first_ref = records_data[0].get("formKey") or self._schema.form_key_from_id( records_data[0].get("formId", 0) ) if first_ref and not self._schema.variables_for_form(first_ref): await cast(AsyncSchemaValidator, self._validator).refresh(study_key) if first_ref not in self._schema.forms: raise ValueError(f"Form key '{first_ref}' not found")
[docs] def create_or_update_records( self, study_key: str, records_data: List[Dict[str, Any]], wait_for_completion: bool = False, timeout: int = 300, poll_interval: int = 5, ) -> Job: """Submit records for creation or update and optionally wait for completion.""" self._validate_form_key(study_key, records_data) cast(SchemaValidator, self._validator).validate_batch(study_key, records_data) job = self._sdk.records.create(study_key, records_data, schema=self._schema) if not wait_for_completion: return job if not job.batch_id: raise ValueError("Submission successful but no batch_id received.") poller = JobPoller(self._sdk.jobs.get, is_async=False) return poller.run(study_key, job.batch_id, poll_interval, timeout)
[docs] async def async_create_or_update_records( self, study_key: str, records_data: List[Dict[str, Any]], wait_for_completion: bool = False, timeout: int = 300, poll_interval: int = 5, ) -> Job: """Asynchronous variant of :meth:`create_or_update_records`.""" await self._async_validate_form_key(study_key, records_data) await cast(AsyncSchemaValidator, self._validator).validate_batch(study_key, records_data) job = await self._sdk.records.async_create(study_key, records_data, schema=self._schema) if not wait_for_completion: return job if not job.batch_id: raise ValueError("Submission successful but no batch_id received.") poller = JobPoller(self._sdk.jobs.async_get, is_async=True) return await poller.run_async(study_key, job.batch_id, poll_interval, timeout)
[docs] def submit_record_batch(self, *args: Any, **kwargs: Any) -> Job: # pragma: no cover warnings.warn( "submit_record_batch is deprecated; use create_or_update_records", DeprecationWarning, stacklevel=2, ) return self.create_or_update_records(*args, **kwargs)
def _build_record_payload( self, *, form_identifier: Union[str, int], form_identifier_type: Literal["key", "id"] = "key", data: Dict[str, Any], subject_identifier: Union[str, int, None] = None, subject_identifier_type: Literal["key", "id", "oid"] = "key", site_identifier: Union[str, int, None] = None, site_identifier_type: Literal["name", "id"] = "name", interval_identifier: Union[str, int, None] = None, interval_identifier_type: Literal["name", "id"] = "name", ) -> Dict[str, Any]: """Return a record payload for ``create_or_update_records``.""" record: Dict[str, Any] = { "formKey" if form_identifier_type == "key" else "formId": form_identifier, "data": data, } if subject_identifier is not None: subject_id_field_map = { "key": "subjectKey", "id": "subjectId", "oid": "subjectOid", } record[subject_id_field_map[subject_identifier_type]] = subject_identifier if site_identifier is not None: site_id_field_map = { "name": "siteName", "id": "siteId", } record[site_id_field_map[site_identifier_type]] = site_identifier if interval_identifier is not None: interval_id_field_map = { "name": "intervalName", "id": "intervalId", } record[interval_id_field_map[interval_identifier_type]] = interval_identifier return record
[docs] def register_subject( self, study_key: str, form_identifier: Union[str, int], site_identifier: Union[str, int], data: Dict[str, Any], form_identifier_type: Literal["key", "id"] = "key", site_identifier_type: Literal["name", "id"] = "name", wait_for_completion: bool = False, timeout: int = 300, poll_interval: int = 5, ) -> Job: """ Registers a new subject by submitting a single record. Args: study_key: The study key. form_identifier: The form key or ID. site_identifier: The site name or ID. data: The dictionary of record data (variable names and values). form_identifier_type: Whether `form_identifier` is a 'key' or 'id'. site_identifier_type: Whether `site_identifier` is a 'name' or 'id'. wait_for_completion: If True, wait for the job to complete. timeout: Timeout in seconds for waiting. poll_interval: Polling interval in seconds. Returns: The Job status object. """ record = self._build_record_payload( form_identifier=form_identifier, form_identifier_type=form_identifier_type, site_identifier=site_identifier, site_identifier_type=site_identifier_type, data=data, ) return self.create_or_update_records( study_key=study_key, records_data=[record], wait_for_completion=wait_for_completion, timeout=timeout, poll_interval=poll_interval, )
[docs] def update_scheduled_record( self, study_key: str, form_identifier: Union[str, int], subject_identifier: Union[str, int], interval_identifier: Union[str, int], data: Dict[str, Any], form_identifier_type: Literal["key", "id"] = "key", subject_identifier_type: Literal["key", "id", "oid"] = "key", interval_identifier_type: Literal["name", "id"] = "name", wait_for_completion: bool = False, timeout: int = 300, poll_interval: int = 5, ) -> Job: """ Updates an existing scheduled record for a subject. Args: study_key: The study key. form_identifier: The form key or ID. subject_identifier: The subject key, ID, or OID. interval_identifier: The interval name or ID. data: The dictionary of record data (variable names and values). form_identifier_type: Whether `form_identifier` is a 'key' or 'id'. subject_identifier_type: Whether `subject_identifier` is a 'key', 'id', or 'oid'. interval_identifier_type: Whether `interval_identifier` is a 'name' or 'id'. wait_for_completion: If True, wait for the job to complete. timeout: Timeout in seconds for waiting. poll_interval: Polling interval in seconds. Returns: The Job status object. """ record = self._build_record_payload( form_identifier=form_identifier, form_identifier_type=form_identifier_type, subject_identifier=subject_identifier, subject_identifier_type=subject_identifier_type, interval_identifier=interval_identifier, interval_identifier_type=interval_identifier_type, data=data, ) return self.create_or_update_records( study_key=study_key, records_data=[record], wait_for_completion=wait_for_completion, timeout=timeout, poll_interval=poll_interval, )
[docs] def create_new_record( self, study_key: str, form_identifier: Union[str, int], subject_identifier: Union[str, int], data: Dict[str, Any], form_identifier_type: Literal["key", "id"] = "key", subject_identifier_type: Literal["key", "id", "oid"] = "key", wait_for_completion: bool = False, timeout: int = 300, poll_interval: int = 5, ) -> Job: """ Creates a new (unscheduled) record for an existing subject. Args: study_key: The study key. form_identifier: The form key or ID. subject_identifier: The subject key, ID, or OID. data: The dictionary of record data (variable names and values). form_identifier_type: Whether `form_identifier` is a 'key' or 'id'. subject_identifier_type: Whether `subject_identifier` is a 'key', 'id', or 'oid'. wait_for_completion: If True, wait for the job to complete. timeout: Timeout in seconds for waiting. poll_interval: Polling interval in seconds. Returns: The Job status object. """ record = self._build_record_payload( form_identifier=form_identifier, form_identifier_type=form_identifier_type, subject_identifier=subject_identifier, subject_identifier_type=subject_identifier_type, data=data, ) return self.create_or_update_records( study_key=study_key, records_data=[record], wait_for_completion=wait_for_completion, timeout=timeout, poll_interval=poll_interval, )