imednet.orchestration package
Multi-study orchestration engine for iMednet SDK.
Provides MultiStudyOrchestrator for executing pipeline functions
concurrently across multiple iMednet study boundaries with fault isolation,
telemetry context propagation, and normalized result reporting.
Quick start
from imednet import ImednetSDK, MultiStudyOrchestrator
def my_pipeline(study_key, sdk, study_logger, **kwargs):
study_logger.info("Processing study")
subjects = sdk.subjects.list(study_key=study_key)
return {"subject_count": len(list(subjects))}
with ImednetSDK() as sdk:
orchestrator = MultiStudyOrchestrator(sdk, max_workers=4)
results = orchestrator.execute_pipeline(my_pipeline)
for study_key, result in results.items():
if result["status"] == "SUCCESS":
print(f"{study_key}: {result['data']}")
else:
print(f"{study_key}: FAILED — {result['error']}")
Public API
MultiStudyOrchestrator— main orchestration engineStudyContextLogAdapter— per-study log adaptermake_study_logger()— factory for study-bound loggersOrchestratorResult— per-study result schema (TypedDict)StudyWorkerCallable— protocol for pipeline callables
Errors
OrchestratorError— base orchestration errorFilterConflictError— raised when both whitelist and blacklist are provided simultaneously
- class imednet.orchestration.MultiStudyOrchestrator[source]
Bases:
objectOrchestrates pipeline execution across multiple active clinical trial boundaries.
The SDK instance passed at construction is treated as an immutable read-only resource — no worker thread may mutate its transport, authentication state, or connection pool during parallel execution.
- Parameters:
sdk (
Any) – A fully initializedImednetSDKinstance. The orchestrator stores a reference (not a copy) and treats it as immutable.max_workers (
int) – Maximum number of concurrent worker threads. Defaults to 4. Set to 1 to force sequential execution (useful for debugging).
Example:
with ImednetSDK(api_key=..., security_key=...) as sdk: orchestrator = MultiStudyOrchestrator(sdk, max_workers=8) results = orchestrator.execute_pipeline(my_pipeline_func)
- execute_pipeline(pipeline_func, whitelist=None, blacklist=None, *args, **kwargs)[source]
Execute a pipeline function concurrently across resolved study contexts.
- Return type:
dict[str,OrchestratorResult]- Parameters:
pipeline_func (StudyWorkerCallable[Any]) –
whitelist (set[str] | None) –
blacklist (set[str] | None) –
args (Any) –
kwargs (Any) –
- property max_workers: int
Maximum number of concurrent worker threads.
- resolve_active_studies(whitelist=None, blacklist=None)[source]
Query the iMednet registry and apply filtering rules.
Calls
self._sdk.studies.list()to fetch the live study inventory, then applies the whitelist OR blacklist (mutually exclusive).- Parameters:
whitelist (
Optional[set[str]]) – If provided, only studies whosestudyKeyis in this set are included. Mutually exclusive withblacklist.blacklist (
Optional[set[str]]) – If provided, studies whosestudyKeyis in this set are excluded. Mutually exclusive withwhitelist.
- Return type:
list[str]- Returns:
Ordered list of study key strings targeting this execution run.
- Raises:
FilterConflictError – When both
whitelistandblacklistare non-empty simultaneously.
- property sdk: Any
The shared read-only SDK instance.
- class imednet.orchestration.OrchestratorResult[source]
Bases:
TypedDictNormalized result entry returned per study by
execute_pipeline.-
data:
Optional[Any]
-
duration_seconds:
float
-
error:
Optional[str]
-
status:
str
-
data:
- class imednet.orchestration.StudyContextLogAdapter[source]
Bases:
LoggerAdapterA logger adapter that enriches records with a bound study key.
- __init__(logger, study_key)[source]
Initialize the adapter with a logger and a dict-like object which provides contextual information. This constructor signature allows easy stacking of LoggerAdapters, if so desired.
You can effectively pass keyword arguments as shown in the following example:
adapter = LoggerAdapter(someLogger, dict(p1=v1, p2=”v2”))
- Parameters:
logger (Logger) –
study_key (str) –
- Return type:
None
- process(msg, kwargs)[source]
Inject
study_keyandstudyKeyinto the log recordextramapping.- Return type:
tuple[Any,MutableMapping[str,Any]]- Parameters:
msg (Any) –
kwargs (MutableMapping[str, Any]) –
- property study_key: str
The study identifier bound to this adapter.
- class imednet.orchestration.StudyWorkerCallable[source]
Bases:
Protocol[T_Output]Defines the explicit signature required for injected pipeline tasks.
Pipeline callables injected into
MultiStudyOrchestrator.execute_pipelinemust conform to this protocol.- __init__(*args, **kwargs)
- imednet.orchestration.make_study_logger(study_key)[source]
Create a
StudyContextLogAdapterfor a study key.- Parameters:
study_key (
str) – The study identifier to bind to log records.- Return type:
- Returns:
A logger adapter that enriches records with the study key.
Submodules
imednet.orchestration.logging module
Per-study logging adapter for the MultiStudyOrchestrator engine.
This module provides StudyContextLogAdapter, which enriches every log
record emitted by a worker thread with study_key and studyKey fields.
When combined
with a JSON formatter (e.g. configure_json_logging()),
each log line carries structured metadata that can be indexed by log
aggregation systems such as Splunk, Datadog, or CloudWatch Logs.
Example JSON output
With JSON logging enabled the adapter produces records like:
{
"timestamp": "2024-01-15T10:23:45.123456Z",
"level": "INFO",
"logger": "imednet.orchestration",
"message": "Starting data extraction",
"study_key": "PROT-01",
"studyKey": "PROT-01"
}
Usage:
from imednet.orchestration.logging import make_study_logger
study_logger = make_study_logger("PROT-01")
study_logger.info("Starting data extraction")
# → emits a record with extra={"study_key": "PROT-01", "studyKey": "PROT-01"}
- class imednet.orchestration.logging.StudyContextLogAdapter[source]
Bases:
LoggerAdapterA logger adapter that enriches records with a bound study key.
- __init__(logger, study_key)[source]
Initialize the adapter with a logger and a dict-like object which provides contextual information. This constructor signature allows easy stacking of LoggerAdapters, if so desired.
You can effectively pass keyword arguments as shown in the following example:
adapter = LoggerAdapter(someLogger, dict(p1=v1, p2=”v2”))
- Parameters:
logger (Logger) –
study_key (str) –
- Return type:
None
- process(msg, kwargs)[source]
Inject
study_keyandstudyKeyinto the log recordextramapping.- Return type:
tuple[Any,MutableMapping[str,Any]]- Parameters:
msg (Any) –
kwargs (MutableMapping[str, Any]) –
- property study_key: str
The study identifier bound to this adapter.
- imednet.orchestration.logging.make_study_logger(study_key)[source]
Create a
StudyContextLogAdapterfor a study key.- Parameters:
study_key (
str) – The study identifier to bind to log records.- Return type:
- Returns:
A logger adapter that enriches records with the study key.
imednet.orchestration.orchestrator module
MultiStudyOrchestrator: concurrent multi-study pipeline engine.
This module provides the MultiStudyOrchestrator class, which drives
parallel execution of a user-supplied pipeline function across all active
iMednet study boundaries discovered via the SDK’s studies endpoint.
Three-step usage
Construct the orchestrator with a live SDK instance:
orchestrator = MultiStudyOrchestrator(sdk, max_workers=8)
Define a pipeline callable that conforms to
StudyWorkerCallable:def my_pipeline(study_key, sdk, study_logger, **kwargs): study_logger.info("Running pipeline for %s", study_key) subjects = sdk.subjects.list(study_key=study_key) return {"count": len(list(subjects))}
Execute and inspect results:
results = orchestrator.execute_pipeline( my_pipeline, whitelist={"PROT-01", "PROT-02"}, ) for study_key, r in results.items(): print(study_key, r["status"], r["duration_seconds"])
Each study runs in its own study_context(),
so context-sensitive SDK calls resolve the correct study automatically.
Per-study failures are captured in the result dict (status="FAILED")
and never propagate as exceptions, ensuring fault isolation.
- class imednet.orchestration.orchestrator.MultiStudyOrchestrator[source]
Bases:
objectOrchestrates pipeline execution across multiple active clinical trial boundaries.
The SDK instance passed at construction is treated as an immutable read-only resource — no worker thread may mutate its transport, authentication state, or connection pool during parallel execution.
- Parameters:
sdk (
Any) – A fully initializedImednetSDKinstance. The orchestrator stores a reference (not a copy) and treats it as immutable.max_workers (
int) – Maximum number of concurrent worker threads. Defaults to 4. Set to 1 to force sequential execution (useful for debugging).
Example:
with ImednetSDK(api_key=..., security_key=...) as sdk: orchestrator = MultiStudyOrchestrator(sdk, max_workers=8) results = orchestrator.execute_pipeline(my_pipeline_func)
- execute_pipeline(pipeline_func, whitelist=None, blacklist=None, *args, **kwargs)[source]
Execute a pipeline function concurrently across resolved study contexts.
- Return type:
dict[str,OrchestratorResult]- Parameters:
pipeline_func (StudyWorkerCallable[Any]) –
whitelist (set[str] | None) –
blacklist (set[str] | None) –
args (Any) –
kwargs (Any) –
- property max_workers: int
Maximum number of concurrent worker threads.
- resolve_active_studies(whitelist=None, blacklist=None)[source]
Query the iMednet registry and apply filtering rules.
Calls
self._sdk.studies.list()to fetch the live study inventory, then applies the whitelist OR blacklist (mutually exclusive).- Parameters:
whitelist (
Optional[set[str]]) – If provided, only studies whosestudyKeyis in this set are included. Mutually exclusive withblacklist.blacklist (
Optional[set[str]]) – If provided, studies whosestudyKeyis in this set are excluded. Mutually exclusive withwhitelist.
- Return type:
list[str]- Returns:
Ordered list of study key strings targeting this execution run.
- Raises:
FilterConflictError – When both
whitelistandblacklistare non-empty simultaneously.
- property sdk: Any
The shared read-only SDK instance.
imednet.orchestration.types module
Type protocols and result schemas for the MultiStudyOrchestrator engine.
This module exports:
StudyWorkerCallable— atyping.Protocolthat pipeline functions must conform to in order to be accepted byexecute_pipeline().OrchestratorResult— atyping_extensions.TypedDictthat describes the per-study result entry returned byexecute_pipeline.T_Output— a covariantTypeVarused to parameterizeStudyWorkerCallable.
StudyWorkerCallable protocol
Any callable passed to execute_pipeline must accept at minimum:
def my_pipeline(
study_key: str, # the study identifier for this worker
sdk: ImednetSDK, # shared (read-only) SDK instance
study_logger: Any, # StudyContextLogAdapter bound to study_key
*args: Any,
**kwargs: Any,
) -> Any: ...
Example:
from imednet.orchestration import StudyWorkerCallable
def count_subjects(study_key, sdk, study_logger, **kwargs):
subjects = sdk.subjects.list(study_key=study_key)
return len(list(subjects))
# Runtime check via @runtime_checkable:
assert isinstance(count_subjects, StudyWorkerCallable)
OrchestratorResult schema
Each entry in the dict returned by execute_pipeline is an
OrchestratorResult TypedDict with the following fields:
status"SUCCESS"or"FAILED".dataThe value returned by the pipeline callable on success, or
Noneon failure.errorrepr()of the exception on failure, orNoneon success.duration_secondsWall-clock time (seconds, rounded to 4 decimal places) for this study.
Example:
results = orchestrator.execute_pipeline(count_subjects)
for study_key, r in results.items():
if r["status"] == "SUCCESS":
print(f"{study_key}: {r['data']} subjects ({r['duration_seconds']:.2f}s)")