imednet.orchestration package

Multi-study orchestration engine for iMednet SDK.

Provides MultiStudyOrchestrator for executing pipeline functions concurrently across multiple iMednet study boundaries with fault isolation, telemetry context propagation, and normalized result reporting.

Quick start

from imednet import ImednetSDK, MultiStudyOrchestrator

def my_pipeline(study_key, sdk, study_logger, **kwargs):
    study_logger.info("Processing study")
    subjects = sdk.subjects.list(study_key=study_key)
    return {"subject_count": len(list(subjects))}

with ImednetSDK() as sdk:
    orchestrator = MultiStudyOrchestrator(sdk, max_workers=4)
    results = orchestrator.execute_pipeline(my_pipeline)

for study_key, result in results.items():
    if result["status"] == "SUCCESS":
        print(f"{study_key}: {result['data']}")
    else:
        print(f"{study_key}: FAILED — {result['error']}")

Public API

Errors

class imednet.orchestration.MultiStudyOrchestrator[source]

Bases: object

Orchestrates pipeline execution across multiple active clinical trial boundaries.

The SDK instance passed at construction is treated as an immutable read-only resource — no worker thread may mutate its transport, authentication state, or connection pool during parallel execution.

Parameters:
  • sdk (Any) – A fully initialized ImednetSDK instance. The orchestrator stores a reference (not a copy) and treats it as immutable.

  • max_workers (int) – Maximum number of concurrent worker threads. Defaults to 4. Set to 1 to force sequential execution (useful for debugging).

Example:

with ImednetSDK(api_key=..., security_key=...) as sdk:
    orchestrator = MultiStudyOrchestrator(sdk, max_workers=8)
    results = orchestrator.execute_pipeline(my_pipeline_func)
__init__(sdk, max_workers=4)[source]
Parameters:
  • sdk (Any) –

  • max_workers (int) –

Return type:

None

execute_pipeline(pipeline_func, whitelist=None, blacklist=None, *args, **kwargs)[source]

Execute a pipeline function concurrently across resolved study contexts.

Return type:

dict[str, OrchestratorResult]

Parameters:
  • pipeline_func (StudyWorkerCallable[Any]) –

  • whitelist (set[str] | None) –

  • blacklist (set[str] | None) –

  • args (Any) –

  • kwargs (Any) –

property max_workers: int

Maximum number of concurrent worker threads.

resolve_active_studies(whitelist=None, blacklist=None)[source]

Query the iMednet registry and apply filtering rules.

Calls self._sdk.studies.list() to fetch the live study inventory, then applies the whitelist OR blacklist (mutually exclusive).

Parameters:
  • whitelist (Optional[set[str]]) – If provided, only studies whose studyKey is in this set are included. Mutually exclusive with blacklist.

  • blacklist (Optional[set[str]]) – If provided, studies whose studyKey is in this set are excluded. Mutually exclusive with whitelist.

Return type:

list[str]

Returns:

Ordered list of study key strings targeting this execution run.

Raises:

FilterConflictError – When both whitelist and blacklist are non-empty simultaneously.

property sdk: Any

The shared read-only SDK instance.

class imednet.orchestration.OrchestratorResult[source]

Bases: TypedDict

Normalized result entry returned per study by execute_pipeline.

data: Optional[Any]
duration_seconds: float
error: Optional[str]
status: str
class imednet.orchestration.StudyContextLogAdapter[source]

Bases: LoggerAdapter

A logger adapter that enriches records with a bound study key.

__init__(logger, study_key)[source]

Initialize the adapter with a logger and a dict-like object which provides contextual information. This constructor signature allows easy stacking of LoggerAdapters, if so desired.

You can effectively pass keyword arguments as shown in the following example:

adapter = LoggerAdapter(someLogger, dict(p1=v1, p2=”v2”))

Parameters:
  • logger (Logger) –

  • study_key (str) –

Return type:

None

process(msg, kwargs)[source]

Inject study_key and studyKey into the log record extra mapping.

Return type:

tuple[Any, MutableMapping[str, Any]]

Parameters:
  • msg (Any) –

  • kwargs (MutableMapping[str, Any]) –

property study_key: str

The study identifier bound to this adapter.

class imednet.orchestration.StudyWorkerCallable[source]

Bases: Protocol[T_Output]

Defines the explicit signature required for injected pipeline tasks.

Pipeline callables injected into MultiStudyOrchestrator.execute_pipeline must conform to this protocol.

__init__(*args, **kwargs)
imednet.orchestration.make_study_logger(study_key)[source]

Create a StudyContextLogAdapter for a study key.

Parameters:

study_key (str) – The study identifier to bind to log records.

Return type:

StudyContextLogAdapter

Returns:

A logger adapter that enriches records with the study key.

Submodules

imednet.orchestration.logging module

Per-study logging adapter for the MultiStudyOrchestrator engine.

This module provides StudyContextLogAdapter, which enriches every log record emitted by a worker thread with study_key and studyKey fields. When combined with a JSON formatter (e.g. configure_json_logging()), each log line carries structured metadata that can be indexed by log aggregation systems such as Splunk, Datadog, or CloudWatch Logs.

Example JSON output

With JSON logging enabled the adapter produces records like:

{
    "timestamp": "2024-01-15T10:23:45.123456Z",
    "level": "INFO",
    "logger": "imednet.orchestration",
    "message": "Starting data extraction",
    "study_key": "PROT-01",
    "studyKey": "PROT-01"
}

Usage:

from imednet.orchestration.logging import make_study_logger

study_logger = make_study_logger("PROT-01")
study_logger.info("Starting data extraction")
# → emits a record with extra={"study_key": "PROT-01", "studyKey": "PROT-01"}
class imednet.orchestration.logging.StudyContextLogAdapter[source]

Bases: LoggerAdapter

A logger adapter that enriches records with a bound study key.

__init__(logger, study_key)[source]

Initialize the adapter with a logger and a dict-like object which provides contextual information. This constructor signature allows easy stacking of LoggerAdapters, if so desired.

You can effectively pass keyword arguments as shown in the following example:

adapter = LoggerAdapter(someLogger, dict(p1=v1, p2=”v2”))

Parameters:
  • logger (Logger) –

  • study_key (str) –

Return type:

None

process(msg, kwargs)[source]

Inject study_key and studyKey into the log record extra mapping.

Return type:

tuple[Any, MutableMapping[str, Any]]

Parameters:
  • msg (Any) –

  • kwargs (MutableMapping[str, Any]) –

property study_key: str

The study identifier bound to this adapter.

imednet.orchestration.logging.make_study_logger(study_key)[source]

Create a StudyContextLogAdapter for a study key.

Parameters:

study_key (str) – The study identifier to bind to log records.

Return type:

StudyContextLogAdapter

Returns:

A logger adapter that enriches records with the study key.

imednet.orchestration.orchestrator module

MultiStudyOrchestrator: concurrent multi-study pipeline engine.

This module provides the MultiStudyOrchestrator class, which drives parallel execution of a user-supplied pipeline function across all active iMednet study boundaries discovered via the SDK’s studies endpoint.

Three-step usage

  1. Construct the orchestrator with a live SDK instance:

    orchestrator = MultiStudyOrchestrator(sdk, max_workers=8)
    
  2. Define a pipeline callable that conforms to StudyWorkerCallable:

    def my_pipeline(study_key, sdk, study_logger, **kwargs):
        study_logger.info("Running pipeline for %s", study_key)
        subjects = sdk.subjects.list(study_key=study_key)
        return {"count": len(list(subjects))}
    
  3. Execute and inspect results:

    results = orchestrator.execute_pipeline(
        my_pipeline,
        whitelist={"PROT-01", "PROT-02"},
    )
    for study_key, r in results.items():
        print(study_key, r["status"], r["duration_seconds"])
    

Each study runs in its own study_context(), so context-sensitive SDK calls resolve the correct study automatically. Per-study failures are captured in the result dict (status="FAILED") and never propagate as exceptions, ensuring fault isolation.

class imednet.orchestration.orchestrator.MultiStudyOrchestrator[source]

Bases: object

Orchestrates pipeline execution across multiple active clinical trial boundaries.

The SDK instance passed at construction is treated as an immutable read-only resource — no worker thread may mutate its transport, authentication state, or connection pool during parallel execution.

Parameters:
  • sdk (Any) – A fully initialized ImednetSDK instance. The orchestrator stores a reference (not a copy) and treats it as immutable.

  • max_workers (int) – Maximum number of concurrent worker threads. Defaults to 4. Set to 1 to force sequential execution (useful for debugging).

Example:

with ImednetSDK(api_key=..., security_key=...) as sdk:
    orchestrator = MultiStudyOrchestrator(sdk, max_workers=8)
    results = orchestrator.execute_pipeline(my_pipeline_func)
__init__(sdk, max_workers=4)[source]
Parameters:
  • sdk (Any) –

  • max_workers (int) –

Return type:

None

execute_pipeline(pipeline_func, whitelist=None, blacklist=None, *args, **kwargs)[source]

Execute a pipeline function concurrently across resolved study contexts.

Return type:

dict[str, OrchestratorResult]

Parameters:
  • pipeline_func (StudyWorkerCallable[Any]) –

  • whitelist (set[str] | None) –

  • blacklist (set[str] | None) –

  • args (Any) –

  • kwargs (Any) –

property max_workers: int

Maximum number of concurrent worker threads.

resolve_active_studies(whitelist=None, blacklist=None)[source]

Query the iMednet registry and apply filtering rules.

Calls self._sdk.studies.list() to fetch the live study inventory, then applies the whitelist OR blacklist (mutually exclusive).

Parameters:
  • whitelist (Optional[set[str]]) – If provided, only studies whose studyKey is in this set are included. Mutually exclusive with blacklist.

  • blacklist (Optional[set[str]]) – If provided, studies whose studyKey is in this set are excluded. Mutually exclusive with whitelist.

Return type:

list[str]

Returns:

Ordered list of study key strings targeting this execution run.

Raises:

FilterConflictError – When both whitelist and blacklist are non-empty simultaneously.

property sdk: Any

The shared read-only SDK instance.

imednet.orchestration.types module

Type protocols and result schemas for the MultiStudyOrchestrator engine.

This module exports:

StudyWorkerCallable protocol

Any callable passed to execute_pipeline must accept at minimum:

def my_pipeline(
    study_key: str,           # the study identifier for this worker
    sdk: ImednetSDK,          # shared (read-only) SDK instance
    study_logger: Any,        # StudyContextLogAdapter bound to study_key
    *args: Any,
    **kwargs: Any,
) -> Any: ...

Example:

from imednet.orchestration import StudyWorkerCallable

def count_subjects(study_key, sdk, study_logger, **kwargs):
    subjects = sdk.subjects.list(study_key=study_key)
    return len(list(subjects))

# Runtime check via @runtime_checkable:
assert isinstance(count_subjects, StudyWorkerCallable)

OrchestratorResult schema

Each entry in the dict returned by execute_pipeline is an OrchestratorResult TypedDict with the following fields:

status

"SUCCESS" or "FAILED".

data

The value returned by the pipeline callable on success, or None on failure.

error

repr() of the exception on failure, or None on success.

duration_seconds

Wall-clock time (seconds, rounded to 4 decimal places) for this study.

Example:

results = orchestrator.execute_pipeline(count_subjects)
for study_key, r in results.items():
    if r["status"] == "SUCCESS":
        print(f"{study_key}: {r['data']} subjects ({r['duration_seconds']:.2f}s)")
class imednet.orchestration.types.OrchestratorResult[source]

Bases: TypedDict

Normalized result entry returned per study by execute_pipeline.

data: Optional[Any]
duration_seconds: float
error: Optional[str]
status: str
class imednet.orchestration.types.StudyWorkerCallable[source]

Bases: Protocol[T_Output]

Defines the explicit signature required for injected pipeline tasks.

Pipeline callables injected into MultiStudyOrchestrator.execute_pipeline must conform to this protocol.

__init__(*args, **kwargs)