Workflow Interactions
The helpers in imednet_workflows combine multiple endpoint calls to
automate common tasks. The diagrams below outline the main steps in each workflow.
Data Extraction
graph TD
A[extract_records_by_criteria] --> B[subjects.list]
A --> C[visits.list]
A --> D[records.list]
B --> E{subject keys}
C --> F{visit ids}
D --> G{apply filters}
E --> G
F --> G
G --> H[return records]
Record Update
graph TD
A[create_or_update_records] --> B[validate_batch]
B --> C[records.create]
C --> D{wait?}
D -- Yes --> E[JobPoller.run]
D -- No --> F[return Job]
E --> F
Subject Data
graph TD
A[get_all_subject_data] --> B[subjects.list]
A --> C[visits.list]
A --> D[records.list]
A --> E[queries.list]
B --> F[aggregate]
C --> F
D --> F
E --> F
F --> G[return SubjectComprehensiveData]
Query Management
graph TD
A[get_open_queries] --> B[queries.list]
B --> C{Iterate Queries}
C --> D{Check latest comment}
D -- closed=False --> E[Add to Open list]
D -- closed=True --> F[Skip]
E --> G[Return open queries]
Background Sync Worker
SyncWorker runs incremental record
synchronisation in a background thread so that the main application thread
(e.g. a Streamlit rendering loop) never blocks on API calls.
graph TD
A[run_forever] --> B{stop_event set?}
B -- No --> C[acquire FileLock]
C --> D[load_records / delta sync]
D --> E[reconcile hard-deletes]
E --> F[release FileLock]
F --> G[wait interval_seconds]
G --> B
B -- Yes --> H[log stopped]
Concurrency model
SQLite is opened in WAL mode (PRAGMA journal_mode=WAL) so multiple reader
connections can query the cache while a single writer holds the write-lock.
A filelock.FileLock placed next to the database file serialises
concurrent writers without blocking readers at all.
Usage example
from imednet import ImednetSDK
from imednet_workflows.cached_loader import CachedRecordsLoader
from imednet_workflows.sync_worker import SyncWorker, SyncWorkerConfig
import threading
sdk = ImednetSDK(api_key="...", security_key="...")
loader = CachedRecordsLoader(sdk)
worker = SyncWorker(
loader,
config=SyncWorkerConfig(study_key="PROT-01", interval_seconds=900),
)
# Start the worker in a daemon thread so it stops when the main process exits.
t = threading.Thread(target=worker.run_forever, daemon=True)
t.start()
# Your application reads from the cache without waiting for the API.
records = loader.get_cached_records("PROT-01")
worker.stop()
t.join()