Source code for imednet.integrations.parquet_engine

"""Partitioned Parquet storage engines."""

from __future__ import annotations

import os
import shutil
from dataclasses import dataclass
from datetime import datetime, timezone
from errno import ENOENT, ENOTEMPTY
from importlib import import_module
from pathlib import Path
from typing import Any, Protocol, runtime_checkable
from uuid import uuid4


def _import_pyarrow() -> tuple[Any, Any]:
    try:
        pyarrow_module = import_module("pyarrow")
        dataset_module = import_module("pyarrow.dataset")
    except ImportError as error:
        raise ImportError(
            "PyArrow is required for partitioned Parquet storage. "
            "Install with \"pip install 'imednet[export]'\"."
        ) from error
    return pyarrow_module, dataset_module


[docs]@runtime_checkable class PartitionedStorageEngine(Protocol): """Interface for writing tables into a partitioned dataset."""
[docs] def write_form_table( self, table: Any, *, base_dir: str, study_key: str, form_key: str, ) -> None: """Persist a form table into a partitioned dataset layout."""
[docs]@dataclass(frozen=True) class PyArrowDatasetPartitionedStorageEngine(PartitionedStorageEngine): """Partitioned storage engine powered by ``pyarrow.dataset.write_dataset``.""" compression: str = "snappy" use_dictionary: bool = True existing_data_behavior: str = "overwrite_or_ignore" staging_dir_name: str = ".imednet_staging" def _table_with_metadata( self, table: Any, *, study_key: str, form_key: str, commit_id: str, ) -> Any: if not hasattr(table, "replace_schema_metadata"): return table schema = getattr(table, "schema", None) existing_metadata = dict(getattr(schema, "metadata", {}) or {}) existing_metadata.update( { b"imednet.writer": b"pyarrow.dataset", b"imednet.commit_id": commit_id.encode("utf-8"), b"imednet.study_key": study_key.encode("utf-8"), b"imednet.form_key": form_key.encode("utf-8"), b"imednet.written_at_utc": datetime.now(timezone.utc).isoformat().encode("utf-8"), } ) return table.replace_schema_metadata(existing_metadata)
[docs] def write_form_table( self, table: Any, *, base_dir: str, study_key: str, form_key: str, ) -> None: pyarrow_module, dataset_module = _import_pyarrow() commit_id = uuid4().hex base_path = Path(base_dir) staging_root = base_path / self.staging_dir_name staging_base_dir = staging_root / commit_id staged_partition_dir = staging_base_dir / f"study_key={study_key}" / f"form_key={form_key}" final_partition_dir = base_path / f"study_key={study_key}" / f"form_key={form_key}" committed_batch_dir = final_partition_dir / f"_batch_{commit_id}" commit_succeeded = False staging_base_dir.mkdir(parents=True, exist_ok=False) partition_schema = pyarrow_module.schema( [ ("study_key", pyarrow_module.string()), ("form_key", pyarrow_module.string()), ] ) partitioned_table = table.append_column( "study_key", pyarrow_module.array([study_key] * table.num_rows, type=pyarrow_module.string()), ).append_column( "form_key", pyarrow_module.array([form_key] * table.num_rows, type=pyarrow_module.string()), ) partitioned_table = self._table_with_metadata( partitioned_table, study_key=study_key, form_key=form_key, commit_id=commit_id, ) parquet_format = dataset_module.ParquetFileFormat() parquet_options = parquet_format.make_write_options( compression=self.compression, use_dictionary=self.use_dictionary, ) try: dataset_module.write_dataset( partitioned_table, base_dir=str(staging_base_dir), basename_template=f"{commit_id}-{{i}}.parquet", partitioning=dataset_module.partitioning(flavor="hive", schema=partition_schema), format=parquet_format, file_options=parquet_options, existing_data_behavior=self.existing_data_behavior, ) if not staged_partition_dir.exists(): raise RuntimeError( "Partition write did not produce staged output for " f"study_key={study_key!r}, form_key={form_key!r}, " f"commit_id={commit_id!r}, staged_partition_dir={staged_partition_dir!s}." ) final_partition_dir.mkdir(parents=True, exist_ok=True) # Where the filesystem supports atomic rename (e.g., local POSIX), # os.replace ensures readers observe a fully committed batch dir. os.replace(staged_partition_dir, committed_batch_dir) commit_succeeded = True finally: shutil.rmtree(staging_base_dir, ignore_errors=True) if not commit_succeeded: # Roll back any empty, reader-visible partition directories that # may have been created before the atomic move failed. current_path = final_partition_dir while current_path != base_path: try: current_path.rmdir() except OSError as error: if error.errno in (ENOENT, ENOTEMPTY): break raise current_path = current_path.parent if staging_root.exists(): try: staging_root.rmdir() except OSError as error: if error.errno not in (ENOENT, ENOTEMPTY): raise
__all__ = ["PartitionedStorageEngine", "PyArrowDatasetPartitionedStorageEngine"]