imednet.integrations package

Integration helpers for exporting study data.

class imednet.integrations.ExportSink[source]

Bases: ABC

Abstract base class for all export sinks.

Subclasses must implement write_batch(), flush(), and close(). The context-manager protocol is provided by this class.

Parameters

config:

Shared sink configuration. Defaults to SinkConfig with all values at their defaults.

__init__(config=None)[source]
Parameters:

config (SinkConfig | None) –

Return type:

None

abstract close()[source]

Release all resources held by this sink (connections, file handles).

Implementations must be idempotent — calling close() on an already closed sink must not raise.

Return type:

None

abstract flush()[source]

Flush any internal buffers to the destination.

Return type:

None

Raises

~imednet.errors.ExportError

On flush failure.

abstract write_batch(records, *, batch_id)[source]

Write one batch of records to the destination.

Return type:

int

Parameters:
  • records (Sequence[Any]) –

  • batch_id (str) –

Parameters

records:

Sequence of records to write. The concrete type depends on the export path:

  • Tabular pathpandas.DataFrame rows or plain dicts produced by RecordMapper.

  • Structure-preserving path – typed Record instances from DataExtractionWorkflow.

  • Warehouse pathpyarrow.RecordBatch or dicts destined for a staged Parquet file.

batch_id:

Caller-supplied idempotency key. Recommended format: "<study_key>/<form_key>/<batch_number>".

Returns

int

Number of records successfully written.

Raises

~imednet.errors.ExportBatchError

When the batch cannot be written after all retries.

class imednet.integrations.MongoDbExportSink[source]

Bases: ExportSink

Export iMednet records as MongoDB document envelopes.

Parameters

uri:

MongoDB connection URI (e.g. "******localhost:27017"). Credentials are never logged.

database:

Target database name.

collection:

Target collection name.

study_key:

Study identifier embedded in every document and used to build the composite _id.

config:

Optional SinkConfig.

Raises

~imednet.errors.ExportConfigurationError

When the client cannot connect to the server.

ImportError

When the pymongo package is not installed.

__init__(uri, database, collection, study_key, *, config=None)[source]
Parameters:
  • uri (str) –

  • database (str) –

  • collection (str) –

  • study_key (str) –

  • config (SinkConfig | None) –

Return type:

None

close()[source]

Close the underlying PyMongo client.

Return type:

None

flush()[source]

No-op: MongoDB writes are committed per bulk operation.

Return type:

None

write_batch(records, *, batch_id)[source]

Write records to MongoDB using upsert (idempotent) or insert.

Return type:

int

Parameters:
  • records (Sequence[Any]) –

  • batch_id (str) –

Parameters

records:

Sequence of typed Record model instances.

batch_id:

Idempotency key (e.g. "MYSTUDY/FORM1/0").

Returns

int

Number of records written (upserted or inserted).

class imednet.integrations.Neo4jExportSink[source]

Bases: ExportSink

Export iMednet records as Neo4j graph nodes and relationships.

Parameters

uri:

Neo4j Bolt or HTTP URI (e.g. "bolt://localhost:7687" or "neo4j+s://xxxx.databases.neo4j.io").

auth:

(username, password) tuple. Credentials are never logged.

study_key:

Study identifier attached to every node for multi-study graphs.

config:

Optional Neo4jSinkConfig (or plain SinkConfig). Defaults to Neo4jSinkConfig with all values at defaults.

Raises

~imednet.errors.ExportConfigurationError

When the driver cannot connect to the database.

ImportError

When the neo4j package is not installed.

__init__(uri, auth, study_key, *, config=None)[source]
Parameters:
  • uri (str) –

  • auth (Tuple[str, str]) –

  • study_key (str) –

  • config (SinkConfig | None) –

Return type:

None

close()[source]

Close the underlying Neo4j driver connection.

Return type:

None

flush()[source]

No-op: Neo4j writes are committed per transaction.

Return type:

None

write_batch(records, *, batch_id)[source]

Write records to Neo4j using MERGE (idempotent) or CREATE.

Return type:

int

Parameters:
  • records (Sequence[Any]) –

  • batch_id (str) –

Parameters

records:

Sequence of typed Record model instances.

batch_id:

Idempotency key (e.g. "MYSTUDY/FORM1/0").

Returns

int

Number of records written.

class imednet.integrations.Neo4jSinkConfig[source]

Bases: SinkConfig

Extended SinkConfig for Neo4j.

Parameters

database:

Target Neo4j database name (default "neo4j").

__init__(batch_size=500, max_retries=3, retry_backoff=1.0, idempotent=True, extra=<factory>, database='neo4j')
Parameters:
  • batch_size (int) –

  • max_retries (int) –

  • retry_backoff (float) –

  • idempotent (bool) –

  • extra (dict[str, Any]) –

  • database (str) –

Return type:

None

database: str = 'neo4j'
class imednet.integrations.PartitionedStorageEngine[source]

Bases: Protocol

Interface for writing tables into a partitioned dataset.

__init__(*args, **kwargs)
write_form_table(table, *, base_dir, study_key, form_key)[source]

Persist a form table into a partitioned dataset layout.

Return type:

None

Parameters:
  • table (Any) –

  • base_dir (str) –

  • study_key (str) –

  • form_key (str) –

class imednet.integrations.PyArrowDatasetPartitionedStorageEngine[source]

Bases: PartitionedStorageEngine

Partitioned storage engine powered by pyarrow.dataset.write_dataset.

__init__(compression='snappy', use_dictionary=True, existing_data_behavior='overwrite_or_ignore', staging_dir_name='.imednet_staging')
Parameters:
  • compression (str) –

  • use_dictionary (bool) –

  • existing_data_behavior (str) –

  • staging_dir_name (str) –

Return type:

None

compression: str = 'snappy'
existing_data_behavior: str = 'overwrite_or_ignore'
staging_dir_name: str = '.imednet_staging'
use_dictionary: bool = True
write_form_table(table, *, base_dir, study_key, form_key)[source]

Persist a form table into a partitioned dataset layout.

Return type:

None

Parameters:
  • table (Any) –

  • base_dir (str) –

  • study_key (str) –

  • form_key (str) –

class imednet.integrations.SinkConfig[source]

Bases: object

Shared configuration for all export sinks.

Parameters

batch_size:

Number of records per ExportSink.write_batch() call.

max_retries:

Maximum number of retry attempts on transient errors.

retry_backoff:

Base delay in seconds between retry attempts. The actual delay grows exponentially: retry_backoff * 2 ** attempt.

idempotent:

When True, sinks use upsert / replace semantics so that replaying a batch with the same batch_id produces no duplicate data.

__init__(batch_size=500, max_retries=3, retry_backoff=1.0, idempotent=True, extra=<factory>)
Parameters:
  • batch_size (int) –

  • max_retries (int) –

  • retry_backoff (float) –

  • idempotent (bool) –

  • extra (dict[str, Any]) –

Return type:

None

batch_size: int = 500
extra: dict[str, Any]
idempotent: bool = True
max_retries: int = 3
retry_backoff: float = 1.0
class imednet.integrations.SnowflakeExportSink[source]

Bases: ExportSink

Stage Parquet files and bulk-load them into Snowflake.

Parameters

config:

SnowflakeSinkConfig containing all connection details and staging paths.

Raises

~imednet.errors.ExportConfigurationError

When the Snowflake connector cannot be initialised or the required configuration values are missing.

ImportError

When snowflake-connector-python or pyarrow are not installed.

__init__(config=None)[source]
Parameters:

config (SinkConfig | None) –

Return type:

None

close()[source]

Close the Snowflake connection and clean up temporary staging files.

Return type:

None

flush()[source]

No-op: each batch is committed individually.

Return type:

None

write_batch(records, *, batch_id)[source]

Write records to Snowflake via Parquet staging + COPY INTO.

Return type:

int

Parameters:
  • records (Sequence[Any]) –

  • batch_id (str) –

Parameters

records:

Sequence of typed Record model instances or plain dicts.

batch_id:

Idempotency key (e.g. "MYSTUDY/FORM1/0").

Returns

int

Number of rows loaded.

class imednet.integrations.SnowflakeSinkConfig[source]

Bases: SinkConfig

Configuration for SnowflakeExportSink.

Parameters

account:

Snowflake account identifier (<org>-<account> or legacy format).

user:

Snowflake user name.

password:

Snowflake password. Never logged.

database:

Target database.

schema:

Target schema.

warehouse:

Virtual warehouse used for the COPY INTO command.

stage:

Snowflake internal stage name (e.g. "MY_STAGE").

table:

Destination table name inside database.*schema*.

stage_prefix:

Path prefix inside the stage (default "imednet").

local_staging_dir:

Local directory used to write Parquet files before PUT. Defaults to a temporary directory created by tempfile.

manifest_path:

Optional path to a JSON-lines file where each loaded batch is recorded.

__init__(batch_size=500, max_retries=3, retry_backoff=1.0, idempotent=True, extra=<factory>, account='', user='', password='', database='', schema='PUBLIC', warehouse='', stage='', table='', stage_prefix='imednet', local_staging_dir=None, manifest_path=None)
Parameters:
  • batch_size (int) –

  • max_retries (int) –

  • retry_backoff (float) –

  • idempotent (bool) –

  • extra (dict[str, Any]) –

  • account (str) –

  • user (str) –

  • password (str) –

  • database (str) –

  • schema (str) –

  • warehouse (str) –

  • stage (str) –

  • table (str) –

  • stage_prefix (str) –

  • local_staging_dir (str | PathLike[str] | None) –

  • manifest_path (str | PathLike[str] | None) –

Return type:

None

account: str = ''
database: str = ''
local_staging_dir: Union[str, PathLike[str], None] = None
manifest_path: Union[str, PathLike[str], None] = None
password: str = ''
schema: str = 'PUBLIC'
stage: str = ''
stage_prefix: str = 'imednet'
table: str = ''
user: str = ''
warehouse: str = ''
imednet.integrations.export_to_csv(sdk, study_key, path, *, use_labels_as_columns=False, **kwargs)[source]

Export study records to a CSV file.

Return type:

None

Parameters:
  • sdk (ImednetSDK) –

  • study_key (str) –

  • path (str) –

  • use_labels_as_columns (bool) –

  • kwargs (Any) –

Parameters

use_labels_as_columns:

When True, variable labels are used for column names instead of variable names.

imednet.integrations.export_to_duckdb(sdk, study_key, db_path, table_name, *, use_labels_as_columns=False, variable_whitelist=None, form_whitelist=None)[source]

Export study records to a DuckDB table using native DataFrame registration.

Return type:

None

Parameters:
  • sdk (ImednetSDK) –

  • study_key (str) –

  • db_path (str) –

  • table_name (str) –

  • use_labels_as_columns (bool) –

  • variable_whitelist (List[str] | None) –

  • form_whitelist (List[int] | None) –

Parameters

sdk:

Authenticated SDK instance used to fetch study records.

study_key:

Study identifier to export.

db_path:

Path to the target .duckdb database file.

table_name:

Name of the destination DuckDB table.

use_labels_as_columns:

When True, variable labels are used for DataFrame column names.

variable_whitelist:

Optional list of variable names to include.

form_whitelist:

Optional list of form IDs to include.

Raises

ImportError

If the optional duckdb dependency is not installed.

imednet.integrations.export_to_duckdb_by_form(sdk, study_key, db_path, *, use_labels_as_columns=False, variable_whitelist=None, form_whitelist=None)[source]

Export records to separate DuckDB tables for each form.

Each form is exported to a table named after form.form_key.

Return type:

None

Parameters:
  • sdk (ImednetSDK) –

  • study_key (str) –

  • db_path (str) –

  • use_labels_as_columns (bool) –

  • variable_whitelist (List[str] | None) –

  • form_whitelist (List[int] | None) –

Parameters

sdk:

Authenticated SDK instance used to fetch study records.

study_key:

Study identifier to export.

db_path:

Path to the target .duckdb database file.

use_labels_as_columns:

When True, variable labels are used for DataFrame column names.

variable_whitelist:

Optional list of variable names to include.

form_whitelist:

Optional list of form IDs to include.

Raises

ImportError

If the optional duckdb dependency is not installed.

imednet.integrations.export_to_excel(sdk, study_key, path, *, use_labels_as_columns=False, **kwargs)[source]

Export study records to an Excel workbook.

Return type:

None

Parameters:
  • sdk (ImednetSDK) –

  • study_key (str) –

  • path (str) –

  • use_labels_as_columns (bool) –

  • kwargs (Any) –

Parameters

use_labels_as_columns:

When True, variable labels are used for column names instead of variable names.

imednet.integrations.export_to_hive_parquet(sdk, study_key, base_dir, *, use_labels_as_columns=False, variable_whitelist=None, form_whitelist=None, chunk_size=5000)[source]

Export study records to a Hive-partitioned Parquet directory layout.

Return type:

None

Parameters:
  • sdk (ImednetSDK) –

  • study_key (str) –

  • base_dir (str) –

  • use_labels_as_columns (bool) –

  • variable_whitelist (List[str] | None) –

  • form_whitelist (List[int] | None) –

  • chunk_size (int) –

imednet.integrations.export_to_json(sdk, study_key, path, *, use_labels_as_columns=False, **kwargs)[source]

Export study records to a JSON file.

Return type:

None

Parameters:
  • sdk (ImednetSDK) –

  • study_key (str) –

  • path (str) –

  • use_labels_as_columns (bool) –

  • kwargs (Any) –

Parameters

use_labels_as_columns:

When True, variable labels are used for column names instead of variable names.

imednet.integrations.export_to_long_sql(sdk, study_key, table_name, conn_str, *, chunk_size=1000)[source]

Export records to a normalized long-format SQL table.

Return type:

None

Parameters:
  • sdk (ImednetSDK) –

  • study_key (str) –

  • table_name (str) –

  • conn_str (str) –

  • chunk_size (int) –

imednet.integrations.export_to_mongodb(sdk, study_key, uri, database, collection, *, config=None)[source]

Export study records to MongoDB using MongoDbExportSink.

Return type:

int

Parameters:
  • sdk (ImednetSDK) –

  • study_key (str) –

  • uri (str) –

  • database (str) –

  • collection (str) –

  • config (SinkConfig | None) –

imednet.integrations.export_to_neo4j(sdk, study_key, uri, auth, *, config=None)[source]

Export study records to Neo4j using Neo4jExportSink.

Return type:

int

Parameters:
imednet.integrations.export_to_parquet(sdk, study_key, path, *, use_labels_as_columns=False, **kwargs)[source]

Export study records to a Parquet file.

Return type:

None

Parameters:
  • sdk (ImednetSDK) –

  • study_key (str) –

  • path (str) –

  • use_labels_as_columns (bool) –

  • kwargs (Any) –

Parameters

use_labels_as_columns:

When True, variable labels are used for column names instead of variable names.

imednet.integrations.export_to_snowflake(sdk, study_key, *, config)[source]

Export study records to Snowflake using SnowflakeExportSink.

Return type:

int

Parameters:
imednet.integrations.export_to_sql(sdk, study_key, table, conn_str, if_exists='replace', *, use_labels_as_columns=False, variable_whitelist=None, form_whitelist=None, **kwargs)[source]

Export study records to a SQL table.

Return type:

None

Parameters:
  • sdk (ImednetSDK) –

  • study_key (str) –

  • table (str) –

  • conn_str (str) –

  • if_exists (str) –

  • use_labels_as_columns (bool) –

  • variable_whitelist (List[str] | None) –

  • form_whitelist (List[int] | None) –

  • kwargs (Any) –

Parameters

use_labels_as_columns:

When True, variable labels are used for column names instead of variable names.

imednet.integrations.export_to_sql_by_form(sdk, study_key, conn_str, if_exists='replace', *, use_labels_as_columns=False, variable_whitelist=None, form_whitelist=None, **kwargs)[source]

Export records to separate SQL tables for each form.

Return type:

None

Parameters:
  • sdk (ImednetSDK) –

  • study_key (str) –

  • conn_str (str) –

  • if_exists (str) –

  • use_labels_as_columns (bool) –

  • variable_whitelist (List[str] | None) –

  • form_whitelist (List[int] | None) –

  • kwargs (Any) –

imednet.integrations.hive_parquet_query(base_dir)[source]

Return the DuckDB read_parquet query string for the given Hive base directory.

Return type:

str

Parameters:

base_dir (str) –

Submodules

imednet.integrations.document module

MongoDB document-envelope export sink.

This module implements the structure-preserving export path for a MongoDB destination. Records fetched via DataExtractionWorkflow are wrapped in a consistent document envelope that preserves the clinical hierarchy.

Document envelope

Each document stored in MongoDB represents a single iMednet Record and carries the metadata needed to locate and de-duplicate it:

{
    "_id": "<study_key>/<record_id>",
    "study_key":     "MYSTUDY",
    "record_id":     1234,
    "subject_id":    99,
    "subject_key":   "SUBJ-001",
    "visit_id":      42,
    "form_id":       7,
    "form_key":      "BASELINE",
    "record_status": "Complete",
    "deleted":       false,
    "date_created":  "2024-01-01T08:00:00Z",
    "date_modified": "2024-01-15T12:00:00Z",
    "record_data":   { "var1": "value1", "var2": 99 },
    "exported_at":   "2024-01-15T12:00:00Z"
}

The _id field is a composite key <study_key>/<record_id> which guarantees uniqueness within a collection and enables efficient upserts.

Optional dependency

Requires pymongo (install via pip install 'imednet[mongodb]'). The client is imported lazily at connection time.

Idempotency

When SinkConfig.idempotent is True (default) the sink uses bulk_write with UpdateOne(..., upsert=True) operations so that re-running an export for the same batch updates existing documents rather than inserting duplicates.

Usage

from imednet.integrations.document import MongoDbExportSink
from imednet_workflows.data_extraction import DataExtractionWorkflow

records = DataExtractionWorkflow(sdk).extract_records_by_criteria(
    study_key="MYSTUDY",
)

with MongoDbExportSink(
    uri="******localhost:27017",
    database="imednet",
    collection="records",
) as sink:
    for i, batch in enumerate(batched(records, 500)):
        sink.write_batch(batch, batch_id=f"MYSTUDY/all/{i}")
class imednet.integrations.document.MongoDbExportSink[source]

Bases: ExportSink

Export iMednet records as MongoDB document envelopes.

Parameters

uri:

MongoDB connection URI (e.g. "******localhost:27017"). Credentials are never logged.

database:

Target database name.

collection:

Target collection name.

study_key:

Study identifier embedded in every document and used to build the composite _id.

config:

Optional SinkConfig.

Raises

~imednet.errors.ExportConfigurationError

When the client cannot connect to the server.

ImportError

When the pymongo package is not installed.

__init__(uri, database, collection, study_key, *, config=None)[source]
Parameters:
  • uri (str) –

  • database (str) –

  • collection (str) –

  • study_key (str) –

  • config (SinkConfig | None) –

Return type:

None

close()[source]

Close the underlying PyMongo client.

Return type:

None

flush()[source]

No-op: MongoDB writes are committed per bulk operation.

Return type:

None

write_batch(records, *, batch_id)[source]

Write records to MongoDB using upsert (idempotent) or insert.

Return type:

int

Parameters:
  • records (Sequence[Any]) –

  • batch_id (str) –

Parameters

records:

Sequence of typed Record model instances.

batch_id:

Idempotency key (e.g. "MYSTUDY/FORM1/0").

Returns

int

Number of records written (upserted or inserted).

imednet.integrations.document.export_to_mongodb(sdk, study_key, uri, database, collection, *, config=None)[source]

Export study records to MongoDB using MongoDbExportSink.

Return type:

int

Parameters:
  • sdk (ImednetSDK) –

  • study_key (str) –

  • uri (str) –

  • database (str) –

  • collection (str) –

  • config (SinkConfig | None) –

imednet.integrations.export module

Export helpers built on top of RecordMapper.

imednet.integrations.export.export_to_csv(sdk, study_key, path, *, use_labels_as_columns=False, **kwargs)[source]

Export study records to a CSV file.

Return type:

None

Parameters:
  • sdk (ImednetSDK) –

  • study_key (str) –

  • path (str) –

  • use_labels_as_columns (bool) –

  • kwargs (Any) –

Parameters

use_labels_as_columns:

When True, variable labels are used for column names instead of variable names.

imednet.integrations.export.export_to_duckdb(sdk, study_key, db_path, table_name, *, use_labels_as_columns=False, variable_whitelist=None, form_whitelist=None)[source]

Export study records to a DuckDB table using native DataFrame registration.

Return type:

None

Parameters:
  • sdk (ImednetSDK) –

  • study_key (str) –

  • db_path (str) –

  • table_name (str) –

  • use_labels_as_columns (bool) –

  • variable_whitelist (List[str] | None) –

  • form_whitelist (List[int] | None) –

Parameters

sdk:

Authenticated SDK instance used to fetch study records.

study_key:

Study identifier to export.

db_path:

Path to the target .duckdb database file.

table_name:

Name of the destination DuckDB table.

use_labels_as_columns:

When True, variable labels are used for DataFrame column names.

variable_whitelist:

Optional list of variable names to include.

form_whitelist:

Optional list of form IDs to include.

Raises

ImportError

If the optional duckdb dependency is not installed.

imednet.integrations.export.export_to_duckdb_by_form(sdk, study_key, db_path, *, use_labels_as_columns=False, variable_whitelist=None, form_whitelist=None)[source]

Export records to separate DuckDB tables for each form.

Each form is exported to a table named after form.form_key.

Return type:

None

Parameters:
  • sdk (ImednetSDK) –

  • study_key (str) –

  • db_path (str) –

  • use_labels_as_columns (bool) –

  • variable_whitelist (List[str] | None) –

  • form_whitelist (List[int] | None) –

Parameters

sdk:

Authenticated SDK instance used to fetch study records.

study_key:

Study identifier to export.

db_path:

Path to the target .duckdb database file.

use_labels_as_columns:

When True, variable labels are used for DataFrame column names.

variable_whitelist:

Optional list of variable names to include.

form_whitelist:

Optional list of form IDs to include.

Raises

ImportError

If the optional duckdb dependency is not installed.

imednet.integrations.export.export_to_excel(sdk, study_key, path, *, use_labels_as_columns=False, **kwargs)[source]

Export study records to an Excel workbook.

Return type:

None

Parameters:
  • sdk (ImednetSDK) –

  • study_key (str) –

  • path (str) –

  • use_labels_as_columns (bool) –

  • kwargs (Any) –

Parameters

use_labels_as_columns:

When True, variable labels are used for column names instead of variable names.

imednet.integrations.export.export_to_json(sdk, study_key, path, *, use_labels_as_columns=False, **kwargs)[source]

Export study records to a JSON file.

Return type:

None

Parameters:
  • sdk (ImednetSDK) –

  • study_key (str) –

  • path (str) –

  • use_labels_as_columns (bool) –

  • kwargs (Any) –

Parameters

use_labels_as_columns:

When True, variable labels are used for column names instead of variable names.

imednet.integrations.export.export_to_long_sql(sdk, study_key, table_name, conn_str, *, chunk_size=1000)[source]

Export records to a normalized long-format SQL table.

Return type:

None

Parameters:
  • sdk (ImednetSDK) –

  • study_key (str) –

  • table_name (str) –

  • conn_str (str) –

  • chunk_size (int) –

imednet.integrations.export.export_to_parquet(sdk, study_key, path, *, use_labels_as_columns=False, **kwargs)[source]

Export study records to a Parquet file.

Return type:

None

Parameters:
  • sdk (ImednetSDK) –

  • study_key (str) –

  • path (str) –

  • use_labels_as_columns (bool) –

  • kwargs (Any) –

Parameters

use_labels_as_columns:

When True, variable labels are used for column names instead of variable names.

imednet.integrations.export.export_to_sql(sdk, study_key, table, conn_str, if_exists='replace', *, use_labels_as_columns=False, variable_whitelist=None, form_whitelist=None, **kwargs)[source]

Export study records to a SQL table.

Return type:

None

Parameters:
  • sdk (ImednetSDK) –

  • study_key (str) –

  • table (str) –

  • conn_str (str) –

  • if_exists (str) –

  • use_labels_as_columns (bool) –

  • variable_whitelist (List[str] | None) –

  • form_whitelist (List[int] | None) –

  • kwargs (Any) –

Parameters

use_labels_as_columns:

When True, variable labels are used for column names instead of variable names.

imednet.integrations.export.export_to_sql_by_form(sdk, study_key, conn_str, if_exists='replace', *, use_labels_as_columns=False, variable_whitelist=None, form_whitelist=None, **kwargs)[source]

Export records to separate SQL tables for each form.

Return type:

None

Parameters:
  • sdk (ImednetSDK) –

  • study_key (str) –

  • conn_str (str) –

  • if_exists (str) –

  • use_labels_as_columns (bool) –

  • variable_whitelist (List[str] | None) –

  • form_whitelist (List[int] | None) –

  • kwargs (Any) –

imednet.integrations.graph module

Neo4j graph export sink.

This module implements the structure-preserving export path for a Neo4j property-graph destination. Records fetched via DataExtractionWorkflow are written as graph nodes and relationships that mirror the clinical data hierarchy.

Graph shape

Nodes

  • (:Study   {study_key})

  • (:Subject {subject_key, study_key})

  • (:Visit   {visit_id, subject_key, study_key})

  • (:Record  {record_id, form_id, visit_id, subject_key, study_key, **record_data})

Relationships

  • (:Study)-[:HAS_SUBJECT]->(:Subject)

  • (:Subject)-[:HAS_VISIT]->(:Visit)

  • (:Visit)-[:HAS_RECORD]->(:Record)

Optional dependency

Requires neo4j (install via pip install 'imednet[neo4j]'). The driver is imported lazily at connection time so that importing this module never fails when neo4j is not installed.

Idempotency

When SinkConfig.idempotent is True (default) the sink uses MERGE on the node’s primary key property so that re-running an export for the same batch updates existing nodes rather than creating duplicates.

Usage

from imednet.integrations.graph import Neo4jExportSink, Neo4jSinkConfig
from imednet_workflows.data_extraction import DataExtractionWorkflow

records = DataExtractionWorkflow(sdk).extract_records_by_criteria(
    study_key="MYSTUDY",
)

config = Neo4jSinkConfig(batch_size=200)
with Neo4jExportSink(
    uri="bolt://localhost:7687",
    auth=("neo4j", "password"),
    config=config,
) as sink:
    for i, batch in enumerate(batched(records, config.batch_size)):
        sink.write_batch(batch, batch_id=f"MYSTUDY/all/{i}")
class imednet.integrations.graph.Neo4jExportSink[source]

Bases: ExportSink

Export iMednet records as Neo4j graph nodes and relationships.

Parameters

uri:

Neo4j Bolt or HTTP URI (e.g. "bolt://localhost:7687" or "neo4j+s://xxxx.databases.neo4j.io").

auth:

(username, password) tuple. Credentials are never logged.

study_key:

Study identifier attached to every node for multi-study graphs.

config:

Optional Neo4jSinkConfig (or plain SinkConfig). Defaults to Neo4jSinkConfig with all values at defaults.

Raises

~imednet.errors.ExportConfigurationError

When the driver cannot connect to the database.

ImportError

When the neo4j package is not installed.

__init__(uri, auth, study_key, *, config=None)[source]
Parameters:
  • uri (str) –

  • auth (Tuple[str, str]) –

  • study_key (str) –

  • config (SinkConfig | None) –

Return type:

None

close()[source]

Close the underlying Neo4j driver connection.

Return type:

None

flush()[source]

No-op: Neo4j writes are committed per transaction.

Return type:

None

write_batch(records, *, batch_id)[source]

Write records to Neo4j using MERGE (idempotent) or CREATE.

Return type:

int

Parameters:
  • records (Sequence[Any]) –

  • batch_id (str) –

Parameters

records:

Sequence of typed Record model instances.

batch_id:

Idempotency key (e.g. "MYSTUDY/FORM1/0").

Returns

int

Number of records written.

class imednet.integrations.graph.Neo4jSinkConfig[source]

Bases: SinkConfig

Extended SinkConfig for Neo4j.

Parameters

database:

Target Neo4j database name (default "neo4j").

__init__(batch_size=500, max_retries=3, retry_backoff=1.0, idempotent=True, extra=<factory>, database='neo4j')
Parameters:
  • batch_size (int) –

  • max_retries (int) –

  • retry_backoff (float) –

  • idempotent (bool) –

  • extra (dict[str, Any]) –

  • database (str) –

Return type:

None

database: str = 'neo4j'
extra: dict[str, Any]
imednet.integrations.graph.export_to_neo4j(sdk, study_key, uri, auth, *, config=None)[source]

Export study records to Neo4j using Neo4jExportSink.

Return type:

int

Parameters:

imednet.integrations.parquet module

Hive-partitioned Parquet integration helpers.

imednet.integrations.parquet.export_to_hive_parquet(sdk, study_key, base_dir, *, use_labels_as_columns=False, variable_whitelist=None, form_whitelist=None, chunk_size=5000)[source]

Export study records to a Hive-partitioned Parquet directory layout.

Return type:

None

Parameters:
  • sdk (ImednetSDK) –

  • study_key (str) –

  • base_dir (str) –

  • use_labels_as_columns (bool) –

  • variable_whitelist (List[str] | None) –

  • form_whitelist (List[int] | None) –

  • chunk_size (int) –

imednet.integrations.parquet.hive_parquet_query(base_dir)[source]

Return the DuckDB read_parquet query string for the given Hive base directory.

Return type:

str

Parameters:

base_dir (str) –

imednet.integrations.parquet_engine module

Partitioned Parquet storage engines.

class imednet.integrations.parquet_engine.PartitionedStorageEngine[source]

Bases: Protocol

Interface for writing tables into a partitioned dataset.

__init__(*args, **kwargs)
write_form_table(table, *, base_dir, study_key, form_key)[source]

Persist a form table into a partitioned dataset layout.

Return type:

None

Parameters:
  • table (Any) –

  • base_dir (str) –

  • study_key (str) –

  • form_key (str) –

class imednet.integrations.parquet_engine.PyArrowDatasetPartitionedStorageEngine[source]

Bases: PartitionedStorageEngine

Partitioned storage engine powered by pyarrow.dataset.write_dataset.

__init__(compression='snappy', use_dictionary=True, existing_data_behavior='overwrite_or_ignore', staging_dir_name='.imednet_staging')
Parameters:
  • compression (str) –

  • use_dictionary (bool) –

  • existing_data_behavior (str) –

  • staging_dir_name (str) –

Return type:

None

compression: str = 'snappy'
existing_data_behavior: str = 'overwrite_or_ignore'
staging_dir_name: str = '.imednet_staging'
use_dictionary: bool = True
write_form_table(table, *, base_dir, study_key, form_key)[source]

Persist a form table into a partitioned dataset layout.

Return type:

None

Parameters:
  • table (Any) –

  • base_dir (str) –

  • study_key (str) –

  • form_key (str) –

imednet.integrations.sink_base module

Shared base classes and helpers for all export sinks.

Architecture decision

The SDK provides three export paths:

Tabular path (RecordMapper + pandas.DataFrame)

Flattens record data into a wide DataFrame and writes it to CSV, Excel, JSON, SQL, DuckDB, or Parquet. All functions in imednet.integrations.export follow this path.

Structure-preserving path (DataExtractionWorkflow + typed records)

Traverses the clinical data hierarchy Study → Subject → Visit → Record and materialises the relationships into a destination that can represent them natively — a property graph (Neo4j) or a nested document store (MongoDB).

Warehouse path (staged Parquet + native bulk loader)

Writes one Parquet file per form to an intermediate staging area and then invokes the destination’s native bulk-loading command (e.g. Snowflake COPY INTO).

All three paths share the contracts defined in this module: SinkConfig, the ExportSink ABC, the import-guard helper _require_optional_dep(), and the credential-redaction helper _redact_uri().

Shared contracts

  • Batching – callers split records into batches and call ExportSink.write_batch() once per batch. The batch_id parameter is a caller-supplied idempotency key (e.g. "<study_key>/<form_key>/<n>").

  • Chunk sizingSinkConfig.batch_size controls the number of records per batch (default 500).

  • Retries – sinks must honour SinkConfig.max_retries and use SinkConfig.retry_backoff as the base delay between attempts.

  • Idempotent writes – when SinkConfig.idempotent is True (default) sinks must use upsert semantics or CREATE OR REPLACE so that replaying a batch with the same batch_id produces no duplicate data.

  • Error propagation – transient errors are retried; permanent errors raise ExportBatchError (includes batch_id) or ExportConfigurationError.

  • Logging – sinks use logging.getLogger(__name__) and must not log raw credentials or full URIs. Pass URIs through _redact_uri() before logging.

Optional dependency conventions

  • Each sink module calls _require_optional_dep() at connection time (not at import time) so that importing the module never fails due to a missing optional library.

  • Extras keys follow the pattern imednet[<key>]:

    pip install 'imednet[neo4j]'
    pip install 'imednet[mongodb]'
    pip install 'imednet[snowflake]'
    

Public-API exposure rules

  • imednet.integrations re-exports only the tabular helpers by default (backward compatibility).

  • The three new sink classes (Neo4jExportSink, MongoDbExportSink, SnowflakeExportSink) and SinkConfig are importable from their respective submodules and are also re-exported from imednet.integrations via explicit names.

  • Airflow helpers in apache_airflow_providers_imednet.export wrap only the tabular path; graph/document/warehouse sinks are not wrapped there.

class imednet.integrations.sink_base.ExportSink[source]

Bases: ABC

Abstract base class for all export sinks.

Subclasses must implement write_batch(), flush(), and close(). The context-manager protocol is provided by this class.

Parameters

config:

Shared sink configuration. Defaults to SinkConfig with all values at their defaults.

__init__(config=None)[source]
Parameters:

config (SinkConfig | None) –

Return type:

None

abstract close()[source]

Release all resources held by this sink (connections, file handles).

Implementations must be idempotent — calling close() on an already closed sink must not raise.

Return type:

None

abstract flush()[source]

Flush any internal buffers to the destination.

Return type:

None

Raises

~imednet.errors.ExportError

On flush failure.

abstract write_batch(records, *, batch_id)[source]

Write one batch of records to the destination.

Return type:

int

Parameters:
  • records (Sequence[Any]) –

  • batch_id (str) –

Parameters

records:

Sequence of records to write. The concrete type depends on the export path:

  • Tabular pathpandas.DataFrame rows or plain dicts produced by RecordMapper.

  • Structure-preserving path – typed Record instances from DataExtractionWorkflow.

  • Warehouse pathpyarrow.RecordBatch or dicts destined for a staged Parquet file.

batch_id:

Caller-supplied idempotency key. Recommended format: "<study_key>/<form_key>/<batch_number>".

Returns

int

Number of records successfully written.

Raises

~imednet.errors.ExportBatchError

When the batch cannot be written after all retries.

class imednet.integrations.sink_base.SinkConfig[source]

Bases: object

Shared configuration for all export sinks.

Parameters

batch_size:

Number of records per ExportSink.write_batch() call.

max_retries:

Maximum number of retry attempts on transient errors.

retry_backoff:

Base delay in seconds between retry attempts. The actual delay grows exponentially: retry_backoff * 2 ** attempt.

idempotent:

When True, sinks use upsert / replace semantics so that replaying a batch with the same batch_id produces no duplicate data.

__init__(batch_size=500, max_retries=3, retry_backoff=1.0, idempotent=True, extra=<factory>)
Parameters:
  • batch_size (int) –

  • max_retries (int) –

  • retry_backoff (float) –

  • idempotent (bool) –

  • extra (dict[str, Any]) –

Return type:

None

batch_size: int = 500
extra: dict[str, Any]
idempotent: bool = True
max_retries: int = 3
retry_backoff: float = 1.0
imednet.integrations.sink_base.iter_batches(records, batch_size)[source]

Yield records in chunks of batch_size.

Return type:

Iterator[Sequence[Any]]

Parameters:
  • records (Sequence[Any]) –

  • batch_size (int) –

imednet.integrations.warehouse module

Snowflake warehouse export sink.

This module implements the warehouse export path for a Snowflake destination. Study records are:

  1. Written to Parquet files in a local staging directory (one file per batch).

  2. Uploaded to the configured Snowflake internal stage via PUT.

  3. Bulk-loaded into the target table with COPY INTO ... FROM @<stage>.

This two-phase approach decouples data preparation from bulk ingestion, allows the Parquet files to be independently audited or re-uploaded, and leverages Snowflake’s native columnar loader for best throughput.

Manifest

After each successful COPY INTO, a manifest entry is appended to SinkConfig.extra["manifest_path"] (if provided):

{
    "batch_id":   "MYSTUDY/FORM1/0",
    "stage_path": "@MY_STAGE/imednet/MYSTUDY/FORM1/batch_0.parquet",
    "row_count":  500,
    "loaded_at":  "2024-01-15T12:00:00Z"
}

Optional dependencies

  • snowflake-connector-python (pip install 'imednet[snowflake]')

  • pyarrow (included in imednet[snowflake])

Both are imported lazily at connection / write time.

Idempotency

When SinkConfig.idempotent is True (default) the sink uses COPY INTO ... FORCE = FALSE so that Snowflake skips files that have already been loaded, making re-runs safe. Set idempotent = False to force re-ingestion of previously loaded files.

Usage

from imednet.integrations.warehouse import SnowflakeExportSink, SnowflakeSinkConfig

config = SnowflakeSinkConfig(
    account="myorg-myaccount",
    user="loader",
    **{"password": os.environ["SF_PASS"]},  # keep credentials out of source code
    database="IMEDNET_DB",
    schema="PUBLIC",
    warehouse="COMPUTE_WH",
    stage="MY_STAGE",
    table="RECORDS",
    stage_prefix="imednet",
    local_staging_dir="/tmp/imednet_stage",
)
with SnowflakeExportSink(config=config) as sink:
    for i, batch in enumerate(batched(records, config.batch_size)):
        sink.write_batch(batch, batch_id=f"MYSTUDY/FORM1/{i}")
class imednet.integrations.warehouse.SnowflakeExportSink[source]

Bases: ExportSink

Stage Parquet files and bulk-load them into Snowflake.

Parameters

config:

SnowflakeSinkConfig containing all connection details and staging paths.

Raises

~imednet.errors.ExportConfigurationError

When the Snowflake connector cannot be initialised or the required configuration values are missing.

ImportError

When snowflake-connector-python or pyarrow are not installed.

__init__(config=None)[source]
Parameters:

config (SinkConfig | None) –

Return type:

None

close()[source]

Close the Snowflake connection and clean up temporary staging files.

Return type:

None

flush()[source]

No-op: each batch is committed individually.

Return type:

None

write_batch(records, *, batch_id)[source]

Write records to Snowflake via Parquet staging + COPY INTO.

Return type:

int

Parameters:
  • records (Sequence[Any]) –

  • batch_id (str) –

Parameters

records:

Sequence of typed Record model instances or plain dicts.

batch_id:

Idempotency key (e.g. "MYSTUDY/FORM1/0").

Returns

int

Number of rows loaded.

class imednet.integrations.warehouse.SnowflakeSinkConfig[source]

Bases: SinkConfig

Configuration for SnowflakeExportSink.

Parameters

account:

Snowflake account identifier (<org>-<account> or legacy format).

user:

Snowflake user name.

password:

Snowflake password. Never logged.

database:

Target database.

schema:

Target schema.

warehouse:

Virtual warehouse used for the COPY INTO command.

stage:

Snowflake internal stage name (e.g. "MY_STAGE").

table:

Destination table name inside database.*schema*.

stage_prefix:

Path prefix inside the stage (default "imednet").

local_staging_dir:

Local directory used to write Parquet files before PUT. Defaults to a temporary directory created by tempfile.

manifest_path:

Optional path to a JSON-lines file where each loaded batch is recorded.

__init__(batch_size=500, max_retries=3, retry_backoff=1.0, idempotent=True, extra=<factory>, account='', user='', password='', database='', schema='PUBLIC', warehouse='', stage='', table='', stage_prefix='imednet', local_staging_dir=None, manifest_path=None)
Parameters:
  • batch_size (int) –

  • max_retries (int) –

  • retry_backoff (float) –

  • idempotent (bool) –

  • extra (dict[str, Any]) –

  • account (str) –

  • user (str) –

  • password (str) –

  • database (str) –

  • schema (str) –

  • warehouse (str) –

  • stage (str) –

  • table (str) –

  • stage_prefix (str) –

  • local_staging_dir (str | PathLike[str] | None) –

  • manifest_path (str | PathLike[str] | None) –

Return type:

None

account: str = ''
database: str = ''
extra: dict[str, Any]
local_staging_dir: Union[str, PathLike[str], None] = None
manifest_path: Union[str, PathLike[str], None] = None
password: str = ''
schema: str = 'PUBLIC'
stage: str = ''
stage_prefix: str = 'imednet'
table: str = ''
user: str = ''
warehouse: str = ''
imednet.integrations.warehouse.export_to_snowflake(sdk, study_key, *, config)[source]

Export study records to Snowflake using SnowflakeExportSink.

Return type:

int

Parameters: