imednet_workflows package

Subpackages

Submodules

imednet_workflows.cached_loader module

class imednet_workflows.cached_loader.CachedRecordsLoader[source]

Bases: object

Load study records through a local SQLite cache with incremental sync.

__init__(sdk, *, cache_dir=None, database_name='records_cache.sqlite3', retry_attempts=3)[source]
Parameters:
  • sdk (ImednetSDK) –

  • cache_dir (str | Path | None) –

  • database_name (str) –

  • retry_attempts (int) –

Return type:

None

get_cached_records(study_key, *, conn=None)[source]

Return cached records for study_key without contacting the API.

Return type:

list[Record]

Parameters:
  • study_key (str) –

  • conn (Connection | None) –

iter_cached_records(study_key, *, conn=None, chunk_size=5000)[source]

Yield cached records for study_key in bounded chunks.

Return type:

Iterator[Record]

Parameters:
  • study_key (str) –

  • conn (Connection | None) –

  • chunk_size (int) –

load_records(study_key, *, reconcile=True)[source]

Synchronise the cache for study_key and return cached records.

Return type:

list[Record]

Parameters:
  • study_key (str) –

  • reconcile (bool) –

reconcile_cache(conn, study_key, active_record_ids)[source]

Prune records removed from the upstream EDC backend.

Return type:

None

Parameters:
  • conn (Connection) –

  • study_key (str) –

  • active_record_ids (set[int]) –

sync_records(study_key, *, reconcile=True)[source]

Synchronise the cache for study_key without materialising cached rows.

Return type:

None

Parameters:
  • study_key (str) –

  • reconcile (bool) –

imednet_workflows.cached_loader.get_cache_connection(db_path)[source]

Return a SQLite connection configured for concurrent cache access.

Return type:

Connection

Parameters:

db_path (str | Path) –

imednet_workflows.chunked_pipeline module

class imednet_workflows.chunked_pipeline.ChunkedRecordPipeline[source]

Bases: object

Chunked iteration utilities for large-study workflows.

__init__(*, chunk_size=5000)[source]
Parameters:

chunk_size (int) –

Return type:

None

map_chunks(items, mapper)[source]

Apply mapper to items and yield mapped chunks.

Return type:

Iterator[list[TypeVar(R)]]

Parameters:
  • items (Iterable[T]) –

  • mapper (Callable[[T], R]) –

write_parquet_chunks(rows, *, output_dir, filename_prefix='records')[source]

Write chunked parquet files and return written file paths.

Return type:

list[Path]

Parameters:
  • rows (Iterable[dict[str, object]]) –

  • output_dir (str | Path) –

  • filename_prefix (str) –

imednet_workflows.chunked_pipeline.iter_chunks(items, *, chunk_size=5000)[source]

Yield items in bounded chunks.

Return type:

Iterator[list[TypeVar(T)]]

Parameters:
  • items (Iterable[T]) –

  • chunk_size (int) –

imednet_workflows.cli module

imednet_workflows.config_version_control module

SQLite-backed version control ledger for study configurations.

Provides immutable, append-only commit history with SHA-256 content hashing, diff capability, and safe rollback. History blocks are read-only once written.

class imednet_workflows.config_version_control.ConfigVersionStore[source]

Bases: object

Immutable append-only store for StudyConfiguration versions.

Each call to commit_config() creates a new entry signed with a SHA-256 digest of the serialised configuration body. History is strictly read-only — individual commit rows may never be edited or deleted.

Parameters:

db_path (str | Path) – Filesystem path for the SQLite database. Defaults to ~/.imednet/config_versions.sqlite3.

__init__(db_path=PosixPath('/home/runner/.imednet/config_versions.sqlite3'))[source]
Parameters:

db_path (str | Path) –

Return type:

None

commit_config(study_key, config, user, desc)[source]

Serialise config, compute its SHA-256 hash, and persist the commit.

Parameters:
  • study_key (str) – Identifies the study this configuration belongs to.

  • config (StudyConfiguration) – The StudyConfiguration to store.

  • user (str) – Identifier of the person or process making the change.

  • desc (str) – Human-readable description of what changed.

Return type:

str

Returns:

The commit_id (SHA-256 hex digest of the serialised JSON body).

Raises:

ValueError – If a commit with the same content hash already exists for this study, indicating a no-op duplicate.

diff_configs(commit_a, commit_b)[source]

Compute a property-level diff between two commits.

Compares the flat JSON key space of the two commits. Returns a dict with three sub-keys:

  • added — keys present in b but not in a.

  • removed — keys present in a but not in b.

  • changed — keys present in both but with different values.

Parameters:
  • commit_a (str) – SHA-256 commit ID of the before state.

  • commit_b (str) – SHA-256 commit ID of the after state.

Return type:

dict[str, Any]

Returns:

Dict with added, removed, and changed sub-dicts.

Raises:

KeyError – If either commit ID is not found in the store.

get_history(study_key)[source]

Return all commits for study_key, ordered oldest-first.

Parameters:

study_key (str) – The study whose history should be retrieved.

Return type:

list[dict[str, Any]]

Returns:

A list of dicts, each with keys commit_id, study_key, version_tag, modified_by, description, and timestamp. The config_data body is intentionally omitted to keep the payload small — use rollback_config() to retrieve the full body for a specific commit.

rollback_config(study_key, commit_id)[source]

Restore and return the StudyConfiguration stored at commit_id.

This method is non-destructive — it does not modify any existing history rows. The caller is responsible for creating a new commit via commit_config() if they wish to record the rollback.

Parameters:
  • study_key (str) – The study the commit must belong to.

  • commit_id (str) – The SHA-256 commit ID to restore.

Return type:

StudyConfiguration

Returns:

The deserialised StudyConfiguration.

Raises:

KeyError – If the commit is not found or does not belong to the requested study.

imednet_workflows.data_extraction module

Provides workflows for extracting specific datasets from iMednet studies.

class imednet_workflows.data_extraction.DataExtractionWorkflow[source]

Bases: object

Provides methods for complex data extraction tasks involving multiple iMednet endpoints.

Parameters:

sdk (ImednetSDK) – An instance of the ImednetSDK.

__init__(sdk)[source]
Parameters:

sdk (ImednetSDK) –

extract_audit_trail(study_key, start_date=None, end_date=None, user_filter=None, **filters)[source]

Extracts the audit trail (record revisions) based on specified filters.

Parameters:
  • study_key (str) – The key identifying the study.

  • start_date (Optional[str]) – Optional start date filter (YYYY-MM-DD format expected by API).

  • end_date (Optional[str]) – Optional end date filter (YYYY-MM-DD format expected by API).

  • user_filter (Optional[Dict[str, Union[Any, Tuple[str, Any], List[Any]]]]) – Optional dictionary of base filter conditions.

  • **filters (Any) – Additional key-value pairs to be added as equality filters.

Return type:

List[RecordRevision]

Returns:

A list of RecordRevision objects matching the criteria.

extract_records_by_criteria(study_key, record_filter=None, subject_filter=None, visit_filter=None, **other_filters)[source]

Extracts records based on criteria spanning subjects, visits, and records.

Parameters:
  • study_key (str) – The key identifying the study.

  • record_filter (Optional[Dict[str, Union[Any, Tuple[str, Any], List[Any]]]]) – Dictionary of conditions for the records endpoint.

  • subject_filter (Optional[Dict[str, Union[Any, Tuple[str, Any], List[Any]]]]) – Dictionary of conditions for the subjects endpoint.

  • visit_filter (Optional[Dict[str, Union[Any, Tuple[str, Any], List[Any]]]]) – Dictionary of conditions for the visits endpoint.

  • **other_filters (Any) – Additional keyword arguments passed as filters to the records endpoint list method.

Return type:

List[Record]

Returns:

A list of Record objects matching all specified criteria.

imednet_workflows.duckdb_centralizer module

DuckDB ingestion workflow for incremental eCRF centralization.

class imednet_workflows.duckdb_centralizer.DuckDBIngestionWorkflow[source]

Bases: object

Incremental eCRF centralization pipeline using a bronze/silver DuckDB layout.

__init__(sdk, db_path)[source]
Parameters:
Return type:

None

build_silver_view(study_key)[source]

Create or replace the silver_current_state view.

Return type:

None

Parameters:

study_key (str) –

ingest_revisions(study_key, *, start_date=None, end_date=None, mode='append')[source]

Fetch RecordRevisions and write to the bronze_revisions table.

Return type:

int

Parameters:
  • study_key (str) –

  • start_date (str | None) –

  • end_date (str | None) –

  • mode (Literal['append', 'replace']) –

imednet_workflows.extraction_engine module

class imednet_workflows.extraction_engine.ExtractionResult[source]

Bases: BaseModel

Canonical extraction output grouped by reporting domain.

adverse_events: list[AdverseEvent]
device_deficiencies: list[DeviceDeficiency]
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

protocol_deviations: list[ProtocolDeviation]
validation_errors: list[dict[str, Any]]
imednet_workflows.extraction_engine.extract_canonical_records(records, study_configuration)[source]

Extract canonical AE/PD/DD models from raw records using study mappings.

Return type:

ExtractionResult

Parameters:

imednet_workflows.job_poller module

Utility for polling job status.

class imednet_workflows.job_poller.AsyncJobPoller[source]

Bases: BaseJobPoller

Asynchronously poll a job until completion.

__init__(get_job)[source]
Parameters:

get_job (Callable[[str, str], Awaitable[JobStatus]]) –

Return type:

None

async run(study_key, batch_id, interval=5, timeout=300)[source]

Asynchronously poll a job until completion.

Return type:

JobStatus

Parameters:
  • study_key (str) –

  • batch_id (str) –

  • interval (int) –

  • timeout (int) –

class imednet_workflows.job_poller.BaseJobPoller[source]

Bases: object

Base class for polling a job until it reaches a terminal state.

class imednet_workflows.job_poller.JobPoller[source]

Bases: BaseJobPoller

Synchronously poll a job until completion.

__init__(get_job)[source]
Parameters:

get_job (Callable[[str, str], JobStatus]) –

Return type:

None

run(study_key, batch_id, interval=5, timeout=300)[source]

Synchronously poll a job until completion.

Return type:

JobStatus

Parameters:
  • study_key (str) –

  • batch_id (str) –

  • interval (int) –

  • timeout (int) –

exception imednet_workflows.job_poller.JobTimeoutError[source]

Bases: TimeoutError

Raised when a job does not finish before the timeout.

imednet_workflows.namespace module

imednet_workflows.query_management module

Provides workflows for managing queries within iMednet studies.

class imednet_workflows.query_management.QueryManagementWorkflow[source]

Bases: object

Provides methods for common query management tasks.

Parameters:

sdk (ImednetSDK) – An instance of the ImednetSDK.

__init__(sdk)[source]
Parameters:

sdk (ImednetSDK) –

get_open_queries(study_key, additional_filter=None, **kwargs)[source]

Retrieves all open queries for a given study, potentially filtered further.

An ‘open’ query is defined as one where the query comment with the highest sequence number has its ‘closed’ field set to False.

Note: This method fetches queries based on additional_filter and then filters for the ‘open’ state client-side.

Parameters:
  • study_key (str) – The key identifying the study.

  • additional_filter (Optional[Dict[str, Union[Any, Tuple[str, Any], List[Any]]]]) – An optional dictionary of conditions to apply via the API.

  • **kwargs (Any) – Additional keyword arguments passed directly to sdk.queries.list.

Return type:

List[Query]

Returns:

A list of open Query objects matching the criteria.

get_queries_by_site(study_key, site_key, additional_filter=None, **kwargs)[source]

Retrieves all queries for a specific site within a study.

Parameters:
  • study_key (str) – The key identifying the study.

  • site_key (str) – The name of the site.

  • additional_filter (Optional[Dict[str, Union[Any, Tuple[str, Any], List[Any]]]]) – Extra conditions to combine with the subject filter.

  • **kwargs (Any) – Additional keyword arguments passed directly to sdk.queries.list.

Return type:

List[Query]

Returns:

A list of Query objects for the specified site.

get_queries_for_subject(study_key, subject_key, additional_filter=None, **kwargs)[source]

Retrieves all queries for a specific subject within a study.

Parameters:
  • study_key (str) – The key identifying the study.

  • subject_key (str) – The key identifying the subject.

  • additional_filter (Optional[Dict[str, Union[Any, Tuple[str, Any], List[Any]]]]) – An optional dictionary of conditions to combine with the subject filter.

  • **kwargs (Any) – Additional keyword arguments passed directly to sdk.queries.list.

Return type:

List[Query]

Returns:

A list of Query objects for the specified subject.

get_query_state_counts(study_key, **kwargs)[source]

Counts queries grouped by their current state (open/closed/unknown).

The state is determined by the ‘closed’ field of the query comment with the highest sequence number. Queries without any comments are counted as ‘unknown’.

Note: This method fetches all queries matching the base criteria (if any are passed via kwargs) and then performs the aggregation client-side.

Parameters:
  • study_key (str) – The key identifying the study.

  • **kwargs (Any) – Additional keyword arguments passed directly to sdk.queries.list (e.g., for initial filtering before counting).

Return type:

Dict[str, int]

Returns:

A dictionary with keys ‘open’, ‘closed’, ‘unknown’ and their respective counts.

imednet_workflows.record_mapper module

class imednet_workflows.record_mapper.RecordMapper[source]

Bases: object

Maps EDC records for a study into a pandas DataFrame.

Features:
  • Fetches variable definitions for column mapping.

  • Dynamically creates a Pydantic model for type validation of record data.

  • Fetches records, applying server-side filtering where possible.

  • Merges metadata and record data.

  • Offers choice between variable names or labels for column headers.

  • Handles parsing errors gracefully for individual records.

Example

sdk = ImednetSDK(api_key, security_key, base_url) mapper = RecordMapper(sdk) # Get DataFrame with labels as columns, filtered by visit df_labels = mapper.dataframe(study_key=”MYSTUDY”, visit_key=”VISIT1”) # Get DataFrame with variable names as columns df_names = mapper.dataframe(study_key=”MYSTUDY”, use_labels_as_columns=False)

__init__(sdk, *, loader=None, chunk_size=5000)[source]

Initialize with an ImednetSDK instance.

loader enables cache-backed streaming for large-study workflows. chunk_size controls the maximum number of records parsed into each yielded batch when using chunked mapping helpers.

Parameters:
Return type:

None

dataframe(study_key, visit_key=None, use_labels_as_columns=True, variable_whitelist=None, form_whitelist=None)[source]

Return a pandas.DataFrame of records for a study.

This method still materialises all yielded chunks into one DataFrame. For bounded-memory processing of large studies, prefer iter_dataframes().

Return type:

DataFrame

Parameters:
  • study_key (str) –

  • visit_key (str | None) –

  • use_labels_as_columns (bool) –

  • variable_whitelist (List[str] | None) –

  • form_whitelist (List[int] | None) –

iter_dataframes(study_key, visit_key=None, use_labels_as_columns=True, variable_whitelist=None, form_whitelist=None)[source]

Yield mapped record DataFrames in bounded chunks.

Each yielded frame contains at most chunk_size mapped records from this mapper instance. Prefer this method over dataframe() when processing or exporting large studies so each chunk can be committed before the next batch is parsed.

Return type:

Iterator[DataFrame]

Parameters:
  • study_key (str) –

  • visit_key (str | None) –

  • use_labels_as_columns (bool) –

  • variable_whitelist (List[str] | None) –

  • form_whitelist (List[int] | None) –

imednet_workflows.record_update module

Utilities for submitting and updating records in iMedNet studies.

class imednet_workflows.record_update.RecordUpdateWorkflow[source]

Bases: object

Provides workflows for creating or updating records, including batch submission and optional job status monitoring.

Parameters:

sdk (Union[ImednetSDK, AsyncImednetSDK]) – An instance of the ImednetSDK.

__init__(sdk)[source]
Parameters:

sdk (ImednetSDK | AsyncImednetSDK) –

async async_create_or_update_records(study_key, records_data, wait_for_completion=False, timeout=300, poll_interval=5)[source]

Asynchronous variant of create_or_update_records().

Return type:

Job

Parameters:
  • study_key (str) –

  • records_data (List[Dict[str, Any]]) –

  • wait_for_completion (bool) –

  • timeout (int) –

  • poll_interval (int) –

create_new_record(study_key, form_identifier, subject_identifier, data, form_identifier_type='key', subject_identifier_type='key', wait_for_completion=False, timeout=300, poll_interval=5)[source]

Creates a new (unscheduled) record for an existing subject.

Parameters:
  • study_key (str) – The study key.

  • form_identifier (Union[str, int]) – The form key or ID.

  • subject_identifier (Union[str, int]) – The subject key, ID, or OID.

  • data (Dict[str, Any]) – The dictionary of record data (variable names and values).

  • form_identifier_type (Literal['key', 'id']) – Whether form_identifier is a ‘key’ or ‘id’.

  • subject_identifier_type (Literal['key', 'id', 'oid']) – Whether subject_identifier is a ‘key’, ‘id’, or ‘oid’.

  • wait_for_completion (bool) – If True, wait for the job to complete.

  • timeout (int) – Timeout in seconds for waiting.

  • poll_interval (int) – Polling interval in seconds.

Return type:

Job

Returns:

The Job status object.

create_or_update_records(study_key, records_data, wait_for_completion=False, timeout=300, poll_interval=5)[source]

Submit records for creation or update and optionally wait for completion.

Return type:

Job

Parameters:
  • study_key (str) –

  • records_data (List[Dict[str, Any]]) –

  • wait_for_completion (bool) –

  • timeout (int) –

  • poll_interval (int) –

register_subject(study_key, form_identifier, site_identifier, data, form_identifier_type='key', site_identifier_type='name', wait_for_completion=False, timeout=300, poll_interval=5)[source]

Registers a new subject by submitting a single record.

Parameters:
  • study_key (str) – The study key.

  • form_identifier (Union[str, int]) – The form key or ID.

  • site_identifier (Union[str, int]) – The site name or ID.

  • data (Dict[str, Any]) – The dictionary of record data (variable names and values).

  • form_identifier_type (Literal['key', 'id']) – Whether form_identifier is a ‘key’ or ‘id’.

  • site_identifier_type (Literal['name', 'id']) – Whether site_identifier is a ‘name’ or ‘id’.

  • wait_for_completion (bool) – If True, wait for the job to complete.

  • timeout (int) – Timeout in seconds for waiting.

  • poll_interval (int) – Polling interval in seconds.

Return type:

Job

Returns:

The Job status object.

submit_record_batch(*args, **kwargs)[source]
Return type:

Job

Parameters:
  • args (Any) –

  • kwargs (Any) –

update_scheduled_record(study_key, form_identifier, subject_identifier, interval_identifier, data, form_identifier_type='key', subject_identifier_type='key', interval_identifier_type='name', wait_for_completion=False, timeout=300, poll_interval=5)[source]

Updates an existing scheduled record for a subject.

Parameters:
  • study_key (str) – The study key.

  • form_identifier (Union[str, int]) – The form key or ID.

  • subject_identifier (Union[str, int]) – The subject key, ID, or OID.

  • interval_identifier (Union[str, int]) – The interval name or ID.

  • data (Dict[str, Any]) – The dictionary of record data (variable names and values).

  • form_identifier_type (Literal['key', 'id']) – Whether form_identifier is a ‘key’ or ‘id’.

  • subject_identifier_type (Literal['key', 'id', 'oid']) – Whether subject_identifier is a ‘key’, ‘id’, or ‘oid’.

  • interval_identifier_type (Literal['name', 'id']) – Whether interval_identifier is a ‘name’ or ‘id’.

  • wait_for_completion (bool) – If True, wait for the job to complete.

  • timeout (int) – Timeout in seconds for waiting.

  • poll_interval (int) – Polling interval in seconds.

Return type:

Job

Returns:

The Job status object.

imednet_workflows.register_subjects module

Workflow for registering subjects (patients) in iMednet via the Records API. This workflow is self-contained and does not borrow from record_update.py. It provides a simple, robust interface for registering one or more subjects.

class imednet_workflows.register_subjects.RegisterSubjectsWorkflow[source]

Bases: object

Manages the registration of subjects using the iMedNet SDK.

_sdk

An instance of the ImednetSDK.

Type:

ImednetSDK

__init__(sdk)[source]
Parameters:

sdk (ImednetSDK) –

register_subjects(study_key, subjects, email_notify=None)[source]

Registers multiple subjects in the specified study. Sites and subject identifiers are validated before submission.

Parameters:
  • study_key (str) – The key identifying the study.

  • subjects (List[RegisterSubjectRequest]) – A list of RegisterSubjectRequest objects, each defining a subject to be registered.

  • email_notify (Optional[str]) – Optional email address to notify upon completion.

Return type:

Job

Returns:

A Job object representing the background job created for the registration request.

Raises:

ApiError – If the API call fails.

imednet_workflows.schema_profiler module

class imednet_workflows.schema_profiler.FieldProfile[source]

Bases: BaseModel

Summary statistics for a single form field.

inferred_type: str
label: str
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

population_rate: float
unique_count: int
unique_values: list[str]
variable_name: str
class imednet_workflows.schema_profiler.FormProfile[source]

Bases: BaseModel

Summary statistics for a single form.

fields: dict[str, FieldProfile]
form_key: str
form_name: str
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

record_count: int
class imednet_workflows.schema_profiler.SchemaProfiler[source]

Bases: object

Profile form and record-data population across cached records.

__init__(sdk, loader=None)[source]
Parameters:
Return type:

None

profile_records(study_key, *, records=None)[source]

Return per-form field profiling statistics for study_key.

Return type:

dict[str, FormProfile]

Parameters:
  • study_key (str) –

  • records (Iterable[Record] | None) –

imednet_workflows.standards_validation module

class imednet_workflows.standards_validation.CategoricalNormalizer[source]

Bases: object

normalize_record(record, *, terminology_lookups)[source]
Return type:

NormalizationResult

Parameters:
  • record (dict[str, Any]) –

  • terminology_lookups (dict[str, dict[str, str]]) –

class imednet_workflows.standards_validation.NormalizationResult[source]

Bases: BaseModel

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

normalized_record: dict[str, Any]
warnings: list[str]
class imednet_workflows.standards_validation.StandardsReadinessReport[source]

Bases: BaseModel

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

score: float
successfully_validated_fields: int
total_expected_fields: int
violations: list[ValidationViolation]
warnings: list[str]
class imednet_workflows.standards_validation.StandardsReadinessValidator[source]

Bases: object

__init__(profile, normalizer=None)[source]
Parameters:
Return type:

None

score_records(*, records_by_domain, terminology_lookups=None)[source]
Return type:

StandardsReadinessReport

Parameters:
  • records_by_domain (dict[str, list[dict[str, Any]]]) –

  • terminology_lookups (dict[str, dict[str, str]] | None) –

imednet_workflows.state_ledger module

Stateful incremental high-water mark tracker for workflow streams.

class imednet_workflows.state_ledger.ExtractionStateLedger[source]

Bases: object

Manages transactional state bookmarks per study to guarantee absolute ingestion tracking.

__init__(ledger_path='/var/lib/imednet/pipeline_ledger.json')[source]
Parameters:

ledger_path (str) –

Return type:

None

delete_entry(study_key, stream_name=None)[source]

Deletes a study or specific stream entry from the ledger under the file lock.

Returns True if the entry existed and was removed, False otherwise.

Return type:

bool

Parameters:
  • study_key (str) –

  • stream_name (str | None) –

get_last_timestamp(study_key, stream_name)[source]

Returns the high-water mark timestamp for a given study and stream.

Return type:

Optional[datetime]

Parameters:
  • study_key (str) –

  • stream_name (str) –

read_state()[source]

Reads and validates the current ledger state.

Return type:

LedgerState

set_last_timestamp(study_key, stream_name, timestamp, records_processed=0, status='success', error_message=None, metadata=None)[source]

Sets the high-water mark timestamp atomically.

Return type:

None

Parameters:
  • study_key (str) –

  • stream_name (str) –

  • timestamp (datetime) –

  • records_processed (int) –

  • status (str) –

  • error_message (str | None) –

  • metadata (Dict[str, Any] | None) –

transaction(study_key, stream_name, fallback_timestamp=None)[source]

Context manager for transactional state tracking. Yields a dict where user can record ‘records_processed’, ‘new_timestamp’, and ‘metadata’. Saves automatically upon exiting the context with no exceptions. The ledger file lock is held for the entire duration of the context.

Return type:

Generator[Dict[str, Any], None, None]

Parameters:
  • study_key (str) –

  • stream_name (str) –

  • fallback_timestamp (datetime | None) –

write_state(state)[source]

Writes the ledger state atomically using a temporary file.

Return type:

None

Parameters:

state (LedgerState) –

class imednet_workflows.state_ledger.LedgerState[source]

Bases: BaseModel

Schema for the entire ledger file containing all studies.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

studies: Dict[str, StudyState]
class imednet_workflows.state_ledger.StreamState[source]

Bases: BaseModel

Schema for individual stream execution checkpoints.

error_message: Optional[str]
last_run_status: str
last_timestamp: datetime
metadata: Dict[str, Any]
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

records_processed: int
class imednet_workflows.state_ledger.StudyState[source]

Bases: BaseModel

Schema for all streams in a given study context.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

streams: Dict[str, StreamState]

imednet_workflows.study_structure module

async imednet_workflows.study_structure.async_get_study_structure(sdk, study_key)[source]

Asynchronous variant of get_study_structure().

Return type:

StudyStructure

Parameters:
imednet_workflows.study_structure.get_study_structure(sdk, study_key)[source]

Fetches and aggregates study structure information (intervals, forms, variables).

Parameters:
  • sdk (ImednetSDK) – An initialized ImednetSDK instance.

  • study_key (str) – The key of the study to fetch structure for.

Return type:

StudyStructure

Returns:

A StudyStructure object containing nested intervals, forms, and variables.

Raises:

ImednetError – If fetching any part of the structure fails.

imednet_workflows.subject_data module

Provides a workflow to retrieve comprehensive data for a specific subject.

class imednet_workflows.subject_data.SubjectComprehensiveData[source]

Bases: BaseModel

Structure to hold aggregated data for a subject.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

queries: List[Query]
records: List[Record]
subject_details: Optional[Subject]
visits: List[Visit]
class imednet_workflows.subject_data.SubjectDataWorkflow[source]

Bases: object

Provides methods to retrieve comprehensive data related to a specific subject.

Parameters:

sdk (ImednetSDK) – An instance of the ImednetSDK.

__init__(sdk)[source]
Parameters:

sdk (ImednetSDK) –

get_all_subject_data(study_key, subject_key)[source]

Retrieves subject details, visits, records, and queries for a specific subject.

Parameters:
  • study_key (str) – The key identifying the study.

  • subject_key (str) – The key identifying the subject.

Return type:

SubjectComprehensiveData

Returns:

A SubjectComprehensiveData object containing the aggregated data.

imednet_workflows.sync_worker module

class imednet_workflows.sync_worker.SyncWorker[source]

Bases: object

Background cache refresher that runs incremental record sync loops.

__init__(loader, *, config, stop_event=None)[source]
Parameters:
Return type:

None

run_forever()[source]

Run sync cycles until stopped.

Return type:

None

run_once()[source]

Run one idempotent cache sync cycle.

Return type:

int

stop()[source]

Request graceful termination.

Return type:

None

class imednet_workflows.sync_worker.SyncWorkerConfig[source]

Bases: object

SyncWorkerConfig(study_key: ‘str’, interval_seconds: ‘int’ = 900, reconcile: ‘bool’ = True, lock_timeout_seconds: ‘int’ = 30)

__init__(study_key, interval_seconds=900, reconcile=True, lock_timeout_seconds=30)
Parameters:
  • study_key (str) –

  • interval_seconds (int) –

  • reconcile (bool) –

  • lock_timeout_seconds (int) –

Return type:

None

interval_seconds: int
lock_timeout_seconds: int
reconcile: bool
study_key: str

imednet_workflows.triage_store module

class imednet_workflows.triage_store.TriageStore[source]

Bases: object

Thread-safe SQLite-backed triage queue and decision store.

__init__(db_path, *, timeout=30.0, retry_attempts=3)[source]
Parameters:
  • db_path (str | Path) –

  • timeout (float) –

  • retry_attempts (int) –

Return type:

None

add_annotation(item_id, user_id, comment)[source]
Return type:

None

Parameters:
  • item_id (str) –

  • user_id (str) –

  • comment (str) –

assign_item(item_id, assignee)[source]
Return type:

None

Parameters:
  • item_id (str) –

  • assignee (str) –

get_item_last_updated(item_id)[source]
Return type:

Optional[datetime]

Parameters:

item_id (str) –

get_journal_mode()[source]
Return type:

str

get_queue(study_key, status=None)[source]
Return type:

list[TriageItem]

Parameters:
get_triage_item(item_id)[source]
Return type:

Optional[TriageItem]

Parameters:

item_id (str) –

update_status(item_id, to_status, user_id, comment)[source]
Return type:

None

Parameters:
  • item_id (str) –

  • to_status (TriageStatus) –

  • user_id (str) –

  • comment (str | None) –

upsert_item(item)[source]
Return type:

None

Parameters:

item (TriageItem) –