Source code for imednet.workflows.record_update

"""Utilities for submitting and updating records in iMedNet studies."""

import asyncio
import inspect
import time
import warnings
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Literal, Union, cast

from ..models import Job
from ..validation.cache import SchemaCache, SchemaValidator
from .job_poller import JobPoller

if TYPE_CHECKING:
    from ..sdk import ImednetSDK


[docs]class RecordUpdateWorkflow: """ Provides workflows for creating or updating records, including batch submission and optional job status monitoring. Args: sdk: An instance of the ImednetSDK. """ def __init__(self, sdk: "ImednetSDK"): self._sdk = sdk self._validator = SchemaValidator(sdk) if getattr(sdk, "_async_client", None) is None: self._validator._is_async = False self._schema: SchemaCache = cast(SchemaCache, self._validator.schema)
[docs] def create_or_update_records( self, study_key: str, records_data: List[Dict[str, Any]], wait_for_completion: bool = False, timeout: int = 300, poll_interval: int = 5, ) -> Job: """Submit records for creation or update and optionally wait for completion.""" return asyncio.run( self._create_or_update_common( study_key, records_data, wait_for_completion, timeout, poll_interval, is_async=False, ) )
[docs] async def async_create_or_update_records( self, study_key: str, records_data: List[Dict[str, Any]], wait_for_completion: bool = False, timeout: int = 300, poll_interval: int = 5, ) -> Job: """Asynchronous variant of :meth:`create_or_update_records`.""" return await self._create_or_update_common( study_key, records_data, wait_for_completion, timeout, poll_interval, is_async=True, )
async def _create_or_update_common( self, study_key: str, records_data: List[Dict[str, Any]], wait_for_completion: bool, timeout: int, poll_interval: int, *, is_async: bool, ) -> Job: """Shared logic for submitting records synchronously or asynchronously.""" if records_data: first_ref = records_data[0].get("formKey") or self._schema.form_key_from_id( records_data[0].get("formId", 0) ) if first_ref and not self._schema.variables_for_form(first_ref): result = self._validator.refresh(study_key) if inspect.isawaitable(result): await result if first_ref not in self._schema.forms: raise ValueError(f"Form key '{first_ref}' not found") result = self._validator.validate_batch(study_key, records_data) if inspect.isawaitable(result): await result if is_async: job = await self._sdk.records.async_create(study_key, records_data, schema=self._schema) else: job = self._sdk.records.create(study_key, records_data, schema=self._schema) if not wait_for_completion: return job if not job.batch_id: raise ValueError("Submission successful but no batch_id received.") poller = JobPoller(self._sdk.jobs.async_get if is_async else self._sdk.jobs.get, is_async) sleep_fn = asyncio.sleep if is_async else time.sleep return await poller._run_common( study_key, job.batch_id, poller._get_job, cast(Callable[[float], Any], sleep_fn), poll_interval, timeout, )
[docs] def submit_record_batch(self, *args: Any, **kwargs: Any) -> Job: # pragma: no cover warnings.warn( "submit_record_batch is deprecated; use create_or_update_records", DeprecationWarning, stacklevel=2, ) return self.create_or_update_records(*args, **kwargs)
def _build_record_payload( self, *, form_identifier: Union[str, int], form_identifier_type: Literal["key", "id"] = "key", data: Dict[str, Any], subject_identifier: Union[str, int, None] = None, subject_identifier_type: Literal["key", "id", "oid"] = "key", site_identifier: Union[str, int, None] = None, site_identifier_type: Literal["name", "id"] = "name", interval_identifier: Union[str, int, None] = None, interval_identifier_type: Literal["name", "id"] = "name", ) -> Dict[str, Any]: """Return a record payload for ``create_or_update_records``.""" record: Dict[str, Any] = { "formKey" if form_identifier_type == "key" else "formId": form_identifier, "data": data, } if subject_identifier is not None: subject_id_field_map = { "key": "subjectKey", "id": "subjectId", "oid": "subjectOid", } record[subject_id_field_map[subject_identifier_type]] = subject_identifier if site_identifier is not None: record["siteName" if site_identifier_type == "name" else "siteId"] = site_identifier if interval_identifier is not None: record["intervalName" if interval_identifier_type == "name" else "intervalId"] = ( interval_identifier ) return record
[docs] def register_subject( self, study_key: str, form_identifier: Union[str, int], site_identifier: Union[str, int], data: Dict[str, Any], form_identifier_type: Literal["key", "id"] = "key", site_identifier_type: Literal["name", "id"] = "name", wait_for_completion: bool = False, timeout: int = 300, poll_interval: int = 5, ) -> Job: """ Registers a new subject by submitting a single record. Args: study_key: The study key. form_identifier: The form key or ID. site_identifier: The site name or ID. data: The dictionary of record data (variable names and values). form_identifier_type: Whether `form_identifier` is a 'key' or 'id'. site_identifier_type: Whether `site_identifier` is a 'name' or 'id'. wait_for_completion: If True, wait for the job to complete. timeout: Timeout in seconds for waiting. poll_interval: Polling interval in seconds. Returns: The Job status object. """ record = self._build_record_payload( form_identifier=form_identifier, form_identifier_type=form_identifier_type, site_identifier=site_identifier, site_identifier_type=site_identifier_type, data=data, ) return self.create_or_update_records( study_key=study_key, records_data=[record], wait_for_completion=wait_for_completion, timeout=timeout, poll_interval=poll_interval, )
[docs] def update_scheduled_record( self, study_key: str, form_identifier: Union[str, int], subject_identifier: Union[str, int], interval_identifier: Union[str, int], data: Dict[str, Any], form_identifier_type: Literal["key", "id"] = "key", subject_identifier_type: Literal["key", "id", "oid"] = "key", interval_identifier_type: Literal["name", "id"] = "name", wait_for_completion: bool = False, timeout: int = 300, poll_interval: int = 5, ) -> Job: """ Updates an existing scheduled record for a subject. Args: study_key: The study key. form_identifier: The form key or ID. subject_identifier: The subject key, ID, or OID. interval_identifier: The interval name or ID. data: The dictionary of record data (variable names and values). form_identifier_type: Whether `form_identifier` is a 'key' or 'id'. subject_identifier_type: Whether `subject_identifier` is a 'key', 'id', or 'oid'. interval_identifier_type: Whether `interval_identifier` is a 'name' or 'id'. wait_for_completion: If True, wait for the job to complete. timeout: Timeout in seconds for waiting. poll_interval: Polling interval in seconds. Returns: The Job status object. """ record = self._build_record_payload( form_identifier=form_identifier, form_identifier_type=form_identifier_type, subject_identifier=subject_identifier, subject_identifier_type=subject_identifier_type, interval_identifier=interval_identifier, interval_identifier_type=interval_identifier_type, data=data, ) return self.create_or_update_records( study_key=study_key, records_data=[record], wait_for_completion=wait_for_completion, timeout=timeout, poll_interval=poll_interval, )
[docs] def create_new_record( self, study_key: str, form_identifier: Union[str, int], subject_identifier: Union[str, int], data: Dict[str, Any], form_identifier_type: Literal["key", "id"] = "key", subject_identifier_type: Literal["key", "id", "oid"] = "key", wait_for_completion: bool = False, timeout: int = 300, poll_interval: int = 5, ) -> Job: """ Creates a new (unscheduled) record for an existing subject. Args: study_key: The study key. form_identifier: The form key or ID. subject_identifier: The subject key, ID, or OID. data: The dictionary of record data (variable names and values). form_identifier_type: Whether `form_identifier` is a 'key' or 'id'. subject_identifier_type: Whether `subject_identifier` is a 'key', 'id', or 'oid'. wait_for_completion: If True, wait for the job to complete. timeout: Timeout in seconds for waiting. poll_interval: Polling interval in seconds. Returns: The Job status object. """ record = self._build_record_payload( form_identifier=form_identifier, form_identifier_type=form_identifier_type, subject_identifier=subject_identifier, subject_identifier_type=subject_identifier_type, data=data, ) return self.create_or_update_records( study_key=study_key, records_data=[record], wait_for_completion=wait_for_completion, timeout=timeout, poll_interval=poll_interval, )
# Integration: # - Accessed via the main SDK instance # (e.g., `sdk.workflows.record_update.create_or_update_records(...)`). # - Simplifies the process of submitting data and optionally monitoring the asynchronous job.