imednet.integrations package
Integration helpers for exporting study data.
- class imednet.integrations.ExportSink[source]
Bases:
ABCAbstract base class for all export sinks.
Subclasses must implement
write_batch(),flush(), andclose(). The context-manager protocol is provided by this class.Parameters
- config:
Shared sink configuration. Defaults to
SinkConfigwith all values at their defaults.
- __init__(config=None)[source]
- Parameters:
config (SinkConfig | None) –
- Return type:
None
- abstract close()[source]
Release all resources held by this sink (connections, file handles).
Implementations must be idempotent — calling
close()on an already closed sink must not raise.- Return type:
None
- abstract flush()[source]
Flush any internal buffers to the destination.
- Return type:
None
Raises
- ~imednet.errors.ExportError
On flush failure.
- abstract write_batch(records, *, batch_id)[source]
Write one batch of records to the destination.
- Return type:
int- Parameters:
records (Sequence[Any]) –
batch_id (str) –
Parameters
- records:
Sequence of records to write. The concrete type depends on the export path:
Tabular path –
pandas.DataFramerows or plain dicts produced byRecordMapper.Structure-preserving path – typed
Recordinstances fromDataExtractionWorkflow.Warehouse path –
pyarrow.RecordBatchor dicts destined for a staged Parquet file.
- batch_id:
Caller-supplied idempotency key. Recommended format:
"<study_key>/<form_key>/<batch_number>".
Returns
- int
Number of records successfully written.
Raises
- ~imednet.errors.ExportBatchError
When the batch cannot be written after all retries.
- class imednet.integrations.MongoDbExportSink[source]
Bases:
ExportSinkExport iMednet records as MongoDB document envelopes.
Parameters
- uri:
MongoDB connection URI (e.g.
"******localhost:27017"). Credentials are never logged.- database:
Target database name.
- collection:
Target collection name.
- study_key:
Study identifier embedded in every document and used to build the composite
_id.- config:
Optional
SinkConfig.
Raises
- ~imednet.errors.ExportConfigurationError
When the client cannot connect to the server.
- ImportError
When the
pymongopackage is not installed.
- __init__(uri, database, collection, study_key, *, config=None)[source]
- Parameters:
uri (str) –
database (str) –
collection (str) –
study_key (str) –
config (SinkConfig | None) –
- Return type:
None
- write_batch(records, *, batch_id)[source]
Write records to MongoDB using upsert (idempotent) or insert.
- Return type:
int- Parameters:
records (Sequence[Any]) –
batch_id (str) –
Parameters
- records:
Sequence of typed
Recordmodel instances.- batch_id:
Idempotency key (e.g.
"MYSTUDY/FORM1/0").
Returns
- int
Number of records written (upserted or inserted).
- class imednet.integrations.Neo4jExportSink[source]
Bases:
ExportSinkExport iMednet records as Neo4j graph nodes and relationships.
Parameters
- uri:
Neo4j Bolt or HTTP URI (e.g.
"bolt://localhost:7687"or"neo4j+s://xxxx.databases.neo4j.io").- auth:
(username, password)tuple. Credentials are never logged.- study_key:
Study identifier attached to every node for multi-study graphs.
- config:
Optional
Neo4jSinkConfig(or plainSinkConfig). Defaults toNeo4jSinkConfigwith all values at defaults.
Raises
- ~imednet.errors.ExportConfigurationError
When the driver cannot connect to the database.
- ImportError
When the
neo4jpackage is not installed.
- __init__(uri, auth, study_key, *, config=None)[source]
- Parameters:
uri (str) –
auth (Tuple[str, str]) –
study_key (str) –
config (SinkConfig | None) –
- Return type:
None
- write_batch(records, *, batch_id)[source]
Write records to Neo4j using MERGE (idempotent) or CREATE.
- Return type:
int- Parameters:
records (Sequence[Any]) –
batch_id (str) –
Parameters
- records:
Sequence of typed
Recordmodel instances.- batch_id:
Idempotency key (e.g.
"MYSTUDY/FORM1/0").
Returns
- int
Number of records written.
- class imednet.integrations.Neo4jSinkConfig[source]
Bases:
SinkConfigExtended
SinkConfigfor Neo4j.Parameters
- database:
Target Neo4j database name (default
"neo4j").
- __init__(batch_size=500, max_retries=3, retry_backoff=1.0, idempotent=True, extra=<factory>, database='neo4j')
- Parameters:
batch_size (int) –
max_retries (int) –
retry_backoff (float) –
idempotent (bool) –
extra (dict[str, Any]) –
database (str) –
- Return type:
None
-
database:
str= 'neo4j'
- class imednet.integrations.PartitionedStorageEngine[source]
Bases:
ProtocolInterface for writing tables into a partitioned dataset.
- __init__(*args, **kwargs)
- class imednet.integrations.PyArrowDatasetPartitionedStorageEngine[source]
Bases:
PartitionedStorageEnginePartitioned storage engine powered by
pyarrow.dataset.write_dataset.- __init__(compression='snappy', use_dictionary=True, existing_data_behavior='overwrite_or_ignore', staging_dir_name='.imednet_staging')
- Parameters:
compression (str) –
use_dictionary (bool) –
existing_data_behavior (str) –
staging_dir_name (str) –
- Return type:
None
-
compression:
str= 'snappy'
-
existing_data_behavior:
str= 'overwrite_or_ignore'
-
staging_dir_name:
str= '.imednet_staging'
-
use_dictionary:
bool= True
- class imednet.integrations.SinkConfig[source]
Bases:
objectShared configuration for all export sinks.
Parameters
- batch_size:
Number of records per
ExportSink.write_batch()call.- max_retries:
Maximum number of retry attempts on transient errors.
- retry_backoff:
Base delay in seconds between retry attempts. The actual delay grows exponentially:
retry_backoff * 2 ** attempt.- idempotent:
When
True, sinks use upsert / replace semantics so that replaying a batch with the samebatch_idproduces no duplicate data.
- __init__(batch_size=500, max_retries=3, retry_backoff=1.0, idempotent=True, extra=<factory>)
- Parameters:
batch_size (int) –
max_retries (int) –
retry_backoff (float) –
idempotent (bool) –
extra (dict[str, Any]) –
- Return type:
None
-
batch_size:
int= 500
-
extra:
dict[str,Any]
-
idempotent:
bool= True
-
max_retries:
int= 3
-
retry_backoff:
float= 1.0
- class imednet.integrations.SnowflakeExportSink[source]
Bases:
ExportSinkStage Parquet files and bulk-load them into Snowflake.
Parameters
- config:
SnowflakeSinkConfigcontaining all connection details and staging paths.
Raises
- ~imednet.errors.ExportConfigurationError
When the Snowflake connector cannot be initialised or the required configuration values are missing.
- ImportError
When
snowflake-connector-pythonorpyarroware not installed.
- __init__(config=None)[source]
- Parameters:
config (SinkConfig | None) –
- Return type:
None
- close()[source]
Close the Snowflake connection and clean up temporary staging files.
- Return type:
None
- write_batch(records, *, batch_id)[source]
Write records to Snowflake via Parquet staging + COPY INTO.
- Return type:
int- Parameters:
records (Sequence[Any]) –
batch_id (str) –
Parameters
- records:
Sequence of typed
Recordmodel instances or plain dicts.- batch_id:
Idempotency key (e.g.
"MYSTUDY/FORM1/0").
Returns
- int
Number of rows loaded.
- class imednet.integrations.SnowflakeSinkConfig[source]
Bases:
SinkConfigConfiguration for
SnowflakeExportSink.Parameters
- account:
Snowflake account identifier (
<org>-<account>or legacy format).- user:
Snowflake user name.
- password:
Snowflake password. Never logged.
- database:
Target database.
- schema:
Target schema.
- warehouse:
Virtual warehouse used for the
COPY INTOcommand.- stage:
Snowflake internal stage name (e.g.
"MY_STAGE").- table:
Destination table name inside database.*schema*.
- stage_prefix:
Path prefix inside the stage (default
"imednet").- local_staging_dir:
Local directory used to write Parquet files before
PUT. Defaults to a temporary directory created bytempfile.- manifest_path:
Optional path to a JSON-lines file where each loaded batch is recorded.
- __init__(batch_size=500, max_retries=3, retry_backoff=1.0, idempotent=True, extra=<factory>, account='', user='', password='', database='', schema='PUBLIC', warehouse='', stage='', table='', stage_prefix='imednet', local_staging_dir=None, manifest_path=None)
- Parameters:
batch_size (int) –
max_retries (int) –
retry_backoff (float) –
idempotent (bool) –
extra (dict[str, Any]) –
account (str) –
user (str) –
password (str) –
database (str) –
schema (str) –
warehouse (str) –
stage (str) –
table (str) –
stage_prefix (str) –
local_staging_dir (str | PathLike[str] | None) –
manifest_path (str | PathLike[str] | None) –
- Return type:
None
-
account:
str= ''
-
database:
str= ''
-
local_staging_dir:
Union[str,PathLike[str],None] = None
-
manifest_path:
Union[str,PathLike[str],None] = None
-
password:
str= ''
-
schema:
str= 'PUBLIC'
-
stage:
str= ''
-
stage_prefix:
str= 'imednet'
-
table:
str= ''
-
user:
str= ''
-
warehouse:
str= ''
- imednet.integrations.export_to_csv(sdk, study_key, path, *, use_labels_as_columns=False, **kwargs)[source]
Export study records to a CSV file.
- Return type:
None- Parameters:
sdk (ImednetSDK) –
study_key (str) –
path (str) –
use_labels_as_columns (bool) –
kwargs (Any) –
Parameters
- use_labels_as_columns:
When
True, variable labels are used for column names instead of variable names.
- imednet.integrations.export_to_duckdb(sdk, study_key, db_path, table_name, *, use_labels_as_columns=False, variable_whitelist=None, form_whitelist=None)[source]
Export study records to a DuckDB table using native DataFrame registration.
- Return type:
None- Parameters:
sdk (ImednetSDK) –
study_key (str) –
db_path (str) –
table_name (str) –
use_labels_as_columns (bool) –
variable_whitelist (List[str] | None) –
form_whitelist (List[int] | None) –
Parameters
- sdk:
Authenticated SDK instance used to fetch study records.
- study_key:
Study identifier to export.
- db_path:
Path to the target
.duckdbdatabase file.- table_name:
Name of the destination DuckDB table.
- use_labels_as_columns:
When
True, variable labels are used for DataFrame column names.- variable_whitelist:
Optional list of variable names to include.
- form_whitelist:
Optional list of form IDs to include.
Raises
- ImportError
If the optional
duckdbdependency is not installed.
- imednet.integrations.export_to_duckdb_by_form(sdk, study_key, db_path, *, use_labels_as_columns=False, variable_whitelist=None, form_whitelist=None)[source]
Export records to separate DuckDB tables for each form.
Each form is exported to a table named after
form.form_key.- Return type:
None- Parameters:
sdk (ImednetSDK) –
study_key (str) –
db_path (str) –
use_labels_as_columns (bool) –
variable_whitelist (List[str] | None) –
form_whitelist (List[int] | None) –
Parameters
- sdk:
Authenticated SDK instance used to fetch study records.
- study_key:
Study identifier to export.
- db_path:
Path to the target
.duckdbdatabase file.- use_labels_as_columns:
When
True, variable labels are used for DataFrame column names.- variable_whitelist:
Optional list of variable names to include.
- form_whitelist:
Optional list of form IDs to include.
Raises
- ImportError
If the optional
duckdbdependency is not installed.
- imednet.integrations.export_to_excel(sdk, study_key, path, *, use_labels_as_columns=False, **kwargs)[source]
Export study records to an Excel workbook.
- Return type:
None- Parameters:
sdk (ImednetSDK) –
study_key (str) –
path (str) –
use_labels_as_columns (bool) –
kwargs (Any) –
Parameters
- use_labels_as_columns:
When
True, variable labels are used for column names instead of variable names.
- imednet.integrations.export_to_hive_parquet(sdk, study_key, base_dir, *, use_labels_as_columns=False, variable_whitelist=None, form_whitelist=None, chunk_size=5000)[source]
Export study records to a Hive-partitioned Parquet directory layout.
- Return type:
None- Parameters:
sdk (ImednetSDK) –
study_key (str) –
base_dir (str) –
use_labels_as_columns (bool) –
variable_whitelist (List[str] | None) –
form_whitelist (List[int] | None) –
chunk_size (int) –
- imednet.integrations.export_to_json(sdk, study_key, path, *, use_labels_as_columns=False, **kwargs)[source]
Export study records to a JSON file.
- Return type:
None- Parameters:
sdk (ImednetSDK) –
study_key (str) –
path (str) –
use_labels_as_columns (bool) –
kwargs (Any) –
Parameters
- use_labels_as_columns:
When
True, variable labels are used for column names instead of variable names.
- imednet.integrations.export_to_long_sql(sdk, study_key, table_name, conn_str, *, chunk_size=1000)[source]
Export records to a normalized long-format SQL table.
- Return type:
None- Parameters:
sdk (ImednetSDK) –
study_key (str) –
table_name (str) –
conn_str (str) –
chunk_size (int) –
- imednet.integrations.export_to_mongodb(sdk, study_key, uri, database, collection, *, config=None)[source]
Export study records to MongoDB using
MongoDbExportSink.- Return type:
int- Parameters:
sdk (ImednetSDK) –
study_key (str) –
uri (str) –
database (str) –
collection (str) –
config (SinkConfig | None) –
- imednet.integrations.export_to_neo4j(sdk, study_key, uri, auth, *, config=None)[source]
Export study records to Neo4j using
Neo4jExportSink.- Return type:
int- Parameters:
sdk (ImednetSDK) –
study_key (str) –
uri (str) –
auth (Tuple[str, str]) –
config (Neo4jSinkConfig | None) –
- imednet.integrations.export_to_parquet(sdk, study_key, path, *, use_labels_as_columns=False, **kwargs)[source]
Export study records to a Parquet file.
- Return type:
None- Parameters:
sdk (ImednetSDK) –
study_key (str) –
path (str) –
use_labels_as_columns (bool) –
kwargs (Any) –
Parameters
- use_labels_as_columns:
When
True, variable labels are used for column names instead of variable names.
- imednet.integrations.export_to_snowflake(sdk, study_key, *, config)[source]
Export study records to Snowflake using
SnowflakeExportSink.- Return type:
int- Parameters:
sdk (ImednetSDK) –
study_key (str) –
config (SnowflakeSinkConfig) –
- imednet.integrations.export_to_sql(sdk, study_key, table, conn_str, if_exists='replace', *, use_labels_as_columns=False, variable_whitelist=None, form_whitelist=None, **kwargs)[source]
Export study records to a SQL table.
- Return type:
None- Parameters:
sdk (ImednetSDK) –
study_key (str) –
table (str) –
conn_str (str) –
if_exists (str) –
use_labels_as_columns (bool) –
variable_whitelist (List[str] | None) –
form_whitelist (List[int] | None) –
kwargs (Any) –
Parameters
- use_labels_as_columns:
When
True, variable labels are used for column names instead of variable names.
- imednet.integrations.export_to_sql_by_form(sdk, study_key, conn_str, if_exists='replace', *, use_labels_as_columns=False, variable_whitelist=None, form_whitelist=None, **kwargs)[source]
Export records to separate SQL tables for each form.
- Return type:
None- Parameters:
sdk (ImednetSDK) –
study_key (str) –
conn_str (str) –
if_exists (str) –
use_labels_as_columns (bool) –
variable_whitelist (List[str] | None) –
form_whitelist (List[int] | None) –
kwargs (Any) –
- imednet.integrations.hive_parquet_query(base_dir)[source]
Return the DuckDB read_parquet query string for the given Hive base directory.
- Return type:
str- Parameters:
base_dir (str) –
Submodules
imednet.integrations.document module
MongoDB document-envelope export sink.
This module implements the structure-preserving export path for a MongoDB
destination. Records fetched via
DataExtractionWorkflow are wrapped
in a consistent document envelope that preserves the clinical hierarchy.
Document envelope
Each document stored in MongoDB represents a single iMednet Record and carries the metadata needed to locate and de-duplicate it:
{
"_id": "<study_key>/<record_id>",
"study_key": "MYSTUDY",
"record_id": 1234,
"subject_id": 99,
"subject_key": "SUBJ-001",
"visit_id": 42,
"form_id": 7,
"form_key": "BASELINE",
"record_status": "Complete",
"deleted": false,
"date_created": "2024-01-01T08:00:00Z",
"date_modified": "2024-01-15T12:00:00Z",
"record_data": { "var1": "value1", "var2": 99 },
"exported_at": "2024-01-15T12:00:00Z"
}
The _id field is a composite key <study_key>/<record_id> which
guarantees uniqueness within a collection and enables efficient upserts.
Optional dependency
Requires pymongo (install via pip install 'imednet[mongodb]').
The client is imported lazily at connection time.
Idempotency
When SinkConfig.idempotent is True (default) the sink uses
bulk_write with UpdateOne(..., upsert=True) operations so that
re-running an export for the same batch updates existing documents rather
than inserting duplicates.
Usage
from imednet.integrations.document import MongoDbExportSink
from imednet_workflows.data_extraction import DataExtractionWorkflow
records = DataExtractionWorkflow(sdk).extract_records_by_criteria(
study_key="MYSTUDY",
)
with MongoDbExportSink(
uri="******localhost:27017",
database="imednet",
collection="records",
) as sink:
for i, batch in enumerate(batched(records, 500)):
sink.write_batch(batch, batch_id=f"MYSTUDY/all/{i}")
- class imednet.integrations.document.MongoDbExportSink[source]
Bases:
ExportSinkExport iMednet records as MongoDB document envelopes.
Parameters
- uri:
MongoDB connection URI (e.g.
"******localhost:27017"). Credentials are never logged.- database:
Target database name.
- collection:
Target collection name.
- study_key:
Study identifier embedded in every document and used to build the composite
_id.- config:
Optional
SinkConfig.
Raises
- ~imednet.errors.ExportConfigurationError
When the client cannot connect to the server.
- ImportError
When the
pymongopackage is not installed.
- __init__(uri, database, collection, study_key, *, config=None)[source]
- Parameters:
uri (str) –
database (str) –
collection (str) –
study_key (str) –
config (SinkConfig | None) –
- Return type:
None
- write_batch(records, *, batch_id)[source]
Write records to MongoDB using upsert (idempotent) or insert.
- Return type:
int- Parameters:
records (Sequence[Any]) –
batch_id (str) –
Parameters
- records:
Sequence of typed
Recordmodel instances.- batch_id:
Idempotency key (e.g.
"MYSTUDY/FORM1/0").
Returns
- int
Number of records written (upserted or inserted).
- imednet.integrations.document.export_to_mongodb(sdk, study_key, uri, database, collection, *, config=None)[source]
Export study records to MongoDB using
MongoDbExportSink.- Return type:
int- Parameters:
sdk (ImednetSDK) –
study_key (str) –
uri (str) –
database (str) –
collection (str) –
config (SinkConfig | None) –
imednet.integrations.export module
Export helpers built on top of RecordMapper.
- imednet.integrations.export.export_to_csv(sdk, study_key, path, *, use_labels_as_columns=False, **kwargs)[source]
Export study records to a CSV file.
- Return type:
None- Parameters:
sdk (ImednetSDK) –
study_key (str) –
path (str) –
use_labels_as_columns (bool) –
kwargs (Any) –
Parameters
- use_labels_as_columns:
When
True, variable labels are used for column names instead of variable names.
- imednet.integrations.export.export_to_duckdb(sdk, study_key, db_path, table_name, *, use_labels_as_columns=False, variable_whitelist=None, form_whitelist=None)[source]
Export study records to a DuckDB table using native DataFrame registration.
- Return type:
None- Parameters:
sdk (ImednetSDK) –
study_key (str) –
db_path (str) –
table_name (str) –
use_labels_as_columns (bool) –
variable_whitelist (List[str] | None) –
form_whitelist (List[int] | None) –
Parameters
- sdk:
Authenticated SDK instance used to fetch study records.
- study_key:
Study identifier to export.
- db_path:
Path to the target
.duckdbdatabase file.- table_name:
Name of the destination DuckDB table.
- use_labels_as_columns:
When
True, variable labels are used for DataFrame column names.- variable_whitelist:
Optional list of variable names to include.
- form_whitelist:
Optional list of form IDs to include.
Raises
- ImportError
If the optional
duckdbdependency is not installed.
- imednet.integrations.export.export_to_duckdb_by_form(sdk, study_key, db_path, *, use_labels_as_columns=False, variable_whitelist=None, form_whitelist=None)[source]
Export records to separate DuckDB tables for each form.
Each form is exported to a table named after
form.form_key.- Return type:
None- Parameters:
sdk (ImednetSDK) –
study_key (str) –
db_path (str) –
use_labels_as_columns (bool) –
variable_whitelist (List[str] | None) –
form_whitelist (List[int] | None) –
Parameters
- sdk:
Authenticated SDK instance used to fetch study records.
- study_key:
Study identifier to export.
- db_path:
Path to the target
.duckdbdatabase file.- use_labels_as_columns:
When
True, variable labels are used for DataFrame column names.- variable_whitelist:
Optional list of variable names to include.
- form_whitelist:
Optional list of form IDs to include.
Raises
- ImportError
If the optional
duckdbdependency is not installed.
- imednet.integrations.export.export_to_excel(sdk, study_key, path, *, use_labels_as_columns=False, **kwargs)[source]
Export study records to an Excel workbook.
- Return type:
None- Parameters:
sdk (ImednetSDK) –
study_key (str) –
path (str) –
use_labels_as_columns (bool) –
kwargs (Any) –
Parameters
- use_labels_as_columns:
When
True, variable labels are used for column names instead of variable names.
- imednet.integrations.export.export_to_json(sdk, study_key, path, *, use_labels_as_columns=False, **kwargs)[source]
Export study records to a JSON file.
- Return type:
None- Parameters:
sdk (ImednetSDK) –
study_key (str) –
path (str) –
use_labels_as_columns (bool) –
kwargs (Any) –
Parameters
- use_labels_as_columns:
When
True, variable labels are used for column names instead of variable names.
- imednet.integrations.export.export_to_long_sql(sdk, study_key, table_name, conn_str, *, chunk_size=1000)[source]
Export records to a normalized long-format SQL table.
- Return type:
None- Parameters:
sdk (ImednetSDK) –
study_key (str) –
table_name (str) –
conn_str (str) –
chunk_size (int) –
- imednet.integrations.export.export_to_parquet(sdk, study_key, path, *, use_labels_as_columns=False, **kwargs)[source]
Export study records to a Parquet file.
- Return type:
None- Parameters:
sdk (ImednetSDK) –
study_key (str) –
path (str) –
use_labels_as_columns (bool) –
kwargs (Any) –
Parameters
- use_labels_as_columns:
When
True, variable labels are used for column names instead of variable names.
- imednet.integrations.export.export_to_sql(sdk, study_key, table, conn_str, if_exists='replace', *, use_labels_as_columns=False, variable_whitelist=None, form_whitelist=None, **kwargs)[source]
Export study records to a SQL table.
- Return type:
None- Parameters:
sdk (ImednetSDK) –
study_key (str) –
table (str) –
conn_str (str) –
if_exists (str) –
use_labels_as_columns (bool) –
variable_whitelist (List[str] | None) –
form_whitelist (List[int] | None) –
kwargs (Any) –
Parameters
- use_labels_as_columns:
When
True, variable labels are used for column names instead of variable names.
- imednet.integrations.export.export_to_sql_by_form(sdk, study_key, conn_str, if_exists='replace', *, use_labels_as_columns=False, variable_whitelist=None, form_whitelist=None, **kwargs)[source]
Export records to separate SQL tables for each form.
- Return type:
None- Parameters:
sdk (ImednetSDK) –
study_key (str) –
conn_str (str) –
if_exists (str) –
use_labels_as_columns (bool) –
variable_whitelist (List[str] | None) –
form_whitelist (List[int] | None) –
kwargs (Any) –
imednet.integrations.graph module
Neo4j graph export sink.
This module implements the structure-preserving export path for a Neo4j
property-graph destination. Records fetched via
DataExtractionWorkflow are written
as graph nodes and relationships that mirror the clinical data hierarchy.
Graph shape
Nodes
(:Study {study_key})(:Subject {subject_key, study_key})(:Visit {visit_id, subject_key, study_key})(:Record {record_id, form_id, visit_id, subject_key, study_key, **record_data})
Relationships
(:Study)-[:HAS_SUBJECT]->(:Subject)(:Subject)-[:HAS_VISIT]->(:Visit)(:Visit)-[:HAS_RECORD]->(:Record)
Optional dependency
Requires neo4j (install via pip install 'imednet[neo4j]').
The driver is imported lazily at connection time so that importing this
module never fails when neo4j is not installed.
Idempotency
When SinkConfig.idempotent is True (default) the sink uses
MERGE on the node’s primary key property so that re-running an export
for the same batch updates existing nodes rather than creating duplicates.
Usage
from imednet.integrations.graph import Neo4jExportSink, Neo4jSinkConfig
from imednet_workflows.data_extraction import DataExtractionWorkflow
records = DataExtractionWorkflow(sdk).extract_records_by_criteria(
study_key="MYSTUDY",
)
config = Neo4jSinkConfig(batch_size=200)
with Neo4jExportSink(
uri="bolt://localhost:7687",
auth=("neo4j", "password"),
config=config,
) as sink:
for i, batch in enumerate(batched(records, config.batch_size)):
sink.write_batch(batch, batch_id=f"MYSTUDY/all/{i}")
- class imednet.integrations.graph.Neo4jExportSink[source]
Bases:
ExportSinkExport iMednet records as Neo4j graph nodes and relationships.
Parameters
- uri:
Neo4j Bolt or HTTP URI (e.g.
"bolt://localhost:7687"or"neo4j+s://xxxx.databases.neo4j.io").- auth:
(username, password)tuple. Credentials are never logged.- study_key:
Study identifier attached to every node for multi-study graphs.
- config:
Optional
Neo4jSinkConfig(or plainSinkConfig). Defaults toNeo4jSinkConfigwith all values at defaults.
Raises
- ~imednet.errors.ExportConfigurationError
When the driver cannot connect to the database.
- ImportError
When the
neo4jpackage is not installed.
- __init__(uri, auth, study_key, *, config=None)[source]
- Parameters:
uri (str) –
auth (Tuple[str, str]) –
study_key (str) –
config (SinkConfig | None) –
- Return type:
None
- write_batch(records, *, batch_id)[source]
Write records to Neo4j using MERGE (idempotent) or CREATE.
- Return type:
int- Parameters:
records (Sequence[Any]) –
batch_id (str) –
Parameters
- records:
Sequence of typed
Recordmodel instances.- batch_id:
Idempotency key (e.g.
"MYSTUDY/FORM1/0").
Returns
- int
Number of records written.
- class imednet.integrations.graph.Neo4jSinkConfig[source]
Bases:
SinkConfigExtended
SinkConfigfor Neo4j.Parameters
- database:
Target Neo4j database name (default
"neo4j").
- __init__(batch_size=500, max_retries=3, retry_backoff=1.0, idempotent=True, extra=<factory>, database='neo4j')
- Parameters:
batch_size (int) –
max_retries (int) –
retry_backoff (float) –
idempotent (bool) –
extra (dict[str, Any]) –
database (str) –
- Return type:
None
-
database:
str= 'neo4j'
-
extra:
dict[str,Any]
- imednet.integrations.graph.export_to_neo4j(sdk, study_key, uri, auth, *, config=None)[source]
Export study records to Neo4j using
Neo4jExportSink.- Return type:
int- Parameters:
sdk (ImednetSDK) –
study_key (str) –
uri (str) –
auth (Tuple[str, str]) –
config (Neo4jSinkConfig | None) –
imednet.integrations.parquet module
Hive-partitioned Parquet integration helpers.
- imednet.integrations.parquet.export_to_hive_parquet(sdk, study_key, base_dir, *, use_labels_as_columns=False, variable_whitelist=None, form_whitelist=None, chunk_size=5000)[source]
Export study records to a Hive-partitioned Parquet directory layout.
- Return type:
None- Parameters:
sdk (ImednetSDK) –
study_key (str) –
base_dir (str) –
use_labels_as_columns (bool) –
variable_whitelist (List[str] | None) –
form_whitelist (List[int] | None) –
chunk_size (int) –
imednet.integrations.parquet_engine module
Partitioned Parquet storage engines.
- class imednet.integrations.parquet_engine.PartitionedStorageEngine[source]
Bases:
ProtocolInterface for writing tables into a partitioned dataset.
- __init__(*args, **kwargs)
- class imednet.integrations.parquet_engine.PyArrowDatasetPartitionedStorageEngine[source]
Bases:
PartitionedStorageEnginePartitioned storage engine powered by
pyarrow.dataset.write_dataset.- __init__(compression='snappy', use_dictionary=True, existing_data_behavior='overwrite_or_ignore', staging_dir_name='.imednet_staging')
- Parameters:
compression (str) –
use_dictionary (bool) –
existing_data_behavior (str) –
staging_dir_name (str) –
- Return type:
None
-
compression:
str= 'snappy'
-
existing_data_behavior:
str= 'overwrite_or_ignore'
-
staging_dir_name:
str= '.imednet_staging'
-
use_dictionary:
bool= True
imednet.integrations.sink_base module
Shared base classes and helpers for all export sinks.
Architecture decision
The SDK provides three export paths:
- Tabular path (
RecordMapper+pandas.DataFrame) Flattens record data into a wide DataFrame and writes it to CSV, Excel, JSON, SQL, DuckDB, or Parquet. All functions in
imednet.integrations.exportfollow this path.- Structure-preserving path (
DataExtractionWorkflow+ typed records) Traverses the clinical data hierarchy Study → Subject → Visit → Record and materialises the relationships into a destination that can represent them natively — a property graph (Neo4j) or a nested document store (MongoDB).
- Warehouse path (staged Parquet + native bulk loader)
Writes one Parquet file per form to an intermediate staging area and then invokes the destination’s native bulk-loading command (e.g. Snowflake
COPY INTO).
All three paths share the contracts defined in this module: SinkConfig,
the ExportSink ABC, the import-guard helper _require_optional_dep(),
and the credential-redaction helper _redact_uri().
Optional dependency conventions
Each sink module calls
_require_optional_dep()at connection time (not at import time) so that importing the module never fails due to a missing optional library.Extras keys follow the pattern
imednet[<key>]:pip install 'imednet[neo4j]' pip install 'imednet[mongodb]' pip install 'imednet[snowflake]'
Public-API exposure rules
imednet.integrationsre-exports only the tabular helpers by default (backward compatibility).The three new sink classes (
Neo4jExportSink,MongoDbExportSink,SnowflakeExportSink) andSinkConfigare importable from their respective submodules and are also re-exported fromimednet.integrationsvia explicit names.Airflow helpers in
apache_airflow_providers_imednet.exportwrap only the tabular path; graph/document/warehouse sinks are not wrapped there.
- class imednet.integrations.sink_base.ExportSink[source]
Bases:
ABCAbstract base class for all export sinks.
Subclasses must implement
write_batch(),flush(), andclose(). The context-manager protocol is provided by this class.Parameters
- config:
Shared sink configuration. Defaults to
SinkConfigwith all values at their defaults.
- __init__(config=None)[source]
- Parameters:
config (SinkConfig | None) –
- Return type:
None
- abstract close()[source]
Release all resources held by this sink (connections, file handles).
Implementations must be idempotent — calling
close()on an already closed sink must not raise.- Return type:
None
- abstract flush()[source]
Flush any internal buffers to the destination.
- Return type:
None
Raises
- ~imednet.errors.ExportError
On flush failure.
- abstract write_batch(records, *, batch_id)[source]
Write one batch of records to the destination.
- Return type:
int- Parameters:
records (Sequence[Any]) –
batch_id (str) –
Parameters
- records:
Sequence of records to write. The concrete type depends on the export path:
Tabular path –
pandas.DataFramerows or plain dicts produced byRecordMapper.Structure-preserving path – typed
Recordinstances fromDataExtractionWorkflow.Warehouse path –
pyarrow.RecordBatchor dicts destined for a staged Parquet file.
- batch_id:
Caller-supplied idempotency key. Recommended format:
"<study_key>/<form_key>/<batch_number>".
Returns
- int
Number of records successfully written.
Raises
- ~imednet.errors.ExportBatchError
When the batch cannot be written after all retries.
- class imednet.integrations.sink_base.SinkConfig[source]
Bases:
objectShared configuration for all export sinks.
Parameters
- batch_size:
Number of records per
ExportSink.write_batch()call.- max_retries:
Maximum number of retry attempts on transient errors.
- retry_backoff:
Base delay in seconds between retry attempts. The actual delay grows exponentially:
retry_backoff * 2 ** attempt.- idempotent:
When
True, sinks use upsert / replace semantics so that replaying a batch with the samebatch_idproduces no duplicate data.
- __init__(batch_size=500, max_retries=3, retry_backoff=1.0, idempotent=True, extra=<factory>)
- Parameters:
batch_size (int) –
max_retries (int) –
retry_backoff (float) –
idempotent (bool) –
extra (dict[str, Any]) –
- Return type:
None
-
batch_size:
int= 500
-
extra:
dict[str,Any]
-
idempotent:
bool= True
-
max_retries:
int= 3
-
retry_backoff:
float= 1.0
imednet.integrations.warehouse module
Snowflake warehouse export sink.
This module implements the warehouse export path for a Snowflake destination. Study records are:
Written to Parquet files in a local staging directory (one file per batch).
Uploaded to the configured Snowflake internal stage via
PUT.Bulk-loaded into the target table with
COPY INTO ... FROM @<stage>.
This two-phase approach decouples data preparation from bulk ingestion, allows the Parquet files to be independently audited or re-uploaded, and leverages Snowflake’s native columnar loader for best throughput.
Manifest
After each successful COPY INTO, a manifest entry is appended to
SinkConfig.extra["manifest_path"] (if provided):
{
"batch_id": "MYSTUDY/FORM1/0",
"stage_path": "@MY_STAGE/imednet/MYSTUDY/FORM1/batch_0.parquet",
"row_count": 500,
"loaded_at": "2024-01-15T12:00:00Z"
}
Optional dependencies
snowflake-connector-python(pip install 'imednet[snowflake]')pyarrow(included inimednet[snowflake])
Both are imported lazily at connection / write time.
Idempotency
When SinkConfig.idempotent is True (default) the sink uses
COPY INTO ... FORCE = FALSE so that Snowflake skips files that have
already been loaded, making re-runs safe. Set idempotent = False
to force re-ingestion of previously loaded files.
Usage
from imednet.integrations.warehouse import SnowflakeExportSink, SnowflakeSinkConfig
config = SnowflakeSinkConfig(
account="myorg-myaccount",
user="loader",
**{"password": os.environ["SF_PASS"]}, # keep credentials out of source code
database="IMEDNET_DB",
schema="PUBLIC",
warehouse="COMPUTE_WH",
stage="MY_STAGE",
table="RECORDS",
stage_prefix="imednet",
local_staging_dir="/tmp/imednet_stage",
)
with SnowflakeExportSink(config=config) as sink:
for i, batch in enumerate(batched(records, config.batch_size)):
sink.write_batch(batch, batch_id=f"MYSTUDY/FORM1/{i}")
- class imednet.integrations.warehouse.SnowflakeExportSink[source]
Bases:
ExportSinkStage Parquet files and bulk-load them into Snowflake.
Parameters
- config:
SnowflakeSinkConfigcontaining all connection details and staging paths.
Raises
- ~imednet.errors.ExportConfigurationError
When the Snowflake connector cannot be initialised or the required configuration values are missing.
- ImportError
When
snowflake-connector-pythonorpyarroware not installed.
- __init__(config=None)[source]
- Parameters:
config (SinkConfig | None) –
- Return type:
None
- close()[source]
Close the Snowflake connection and clean up temporary staging files.
- Return type:
None
- write_batch(records, *, batch_id)[source]
Write records to Snowflake via Parquet staging + COPY INTO.
- Return type:
int- Parameters:
records (Sequence[Any]) –
batch_id (str) –
Parameters
- records:
Sequence of typed
Recordmodel instances or plain dicts.- batch_id:
Idempotency key (e.g.
"MYSTUDY/FORM1/0").
Returns
- int
Number of rows loaded.
- class imednet.integrations.warehouse.SnowflakeSinkConfig[source]
Bases:
SinkConfigConfiguration for
SnowflakeExportSink.Parameters
- account:
Snowflake account identifier (
<org>-<account>or legacy format).- user:
Snowflake user name.
- password:
Snowflake password. Never logged.
- database:
Target database.
- schema:
Target schema.
- warehouse:
Virtual warehouse used for the
COPY INTOcommand.- stage:
Snowflake internal stage name (e.g.
"MY_STAGE").- table:
Destination table name inside database.*schema*.
- stage_prefix:
Path prefix inside the stage (default
"imednet").- local_staging_dir:
Local directory used to write Parquet files before
PUT. Defaults to a temporary directory created bytempfile.- manifest_path:
Optional path to a JSON-lines file where each loaded batch is recorded.
- __init__(batch_size=500, max_retries=3, retry_backoff=1.0, idempotent=True, extra=<factory>, account='', user='', password='', database='', schema='PUBLIC', warehouse='', stage='', table='', stage_prefix='imednet', local_staging_dir=None, manifest_path=None)
- Parameters:
batch_size (int) –
max_retries (int) –
retry_backoff (float) –
idempotent (bool) –
extra (dict[str, Any]) –
account (str) –
user (str) –
password (str) –
database (str) –
schema (str) –
warehouse (str) –
stage (str) –
table (str) –
stage_prefix (str) –
local_staging_dir (str | PathLike[str] | None) –
manifest_path (str | PathLike[str] | None) –
- Return type:
None
-
account:
str= ''
-
database:
str= ''
-
extra:
dict[str,Any]
-
local_staging_dir:
Union[str,PathLike[str],None] = None
-
manifest_path:
Union[str,PathLike[str],None] = None
-
password:
str= ''
-
schema:
str= 'PUBLIC'
-
stage:
str= ''
-
stage_prefix:
str= 'imednet'
-
table:
str= ''
-
user:
str= ''
-
warehouse:
str= ''
- imednet.integrations.warehouse.export_to_snowflake(sdk, study_key, *, config)[source]
Export study records to Snowflake using
SnowflakeExportSink.- Return type:
int- Parameters:
sdk (ImednetSDK) –
study_key (str) –
config (SnowflakeSinkConfig) –