Source code for imednet.models.standards

from __future__ import annotations

from typing import Any

from pydantic import BaseModel

from imednet.utils.validators import is_missing_value


[docs]class ValidationViolation(BaseModel): field: str message: str severity: str # ERROR, WARNING
[docs]class StandardsProfile:
[docs] def __init__( self, *, profile_name: str, required_fields: dict[str, list[str]] | None = None, recommended_fields: dict[str, list[str]] | None = None, optional_fields: dict[str, list[str]] | None = None, value_constraints: dict[str, list[Any]] | None = None, ) -> None: self.profile_name = profile_name self.required_fields = required_fields or {} self.recommended_fields = recommended_fields or {} self.optional_fields = optional_fields or {} self.value_constraints = value_constraints or {}
[docs] def expected_fields(self, domain: str) -> list[str]: domain_key = domain.upper() return [ *self.required_fields.get(domain_key, []), *self.recommended_fields.get(domain_key, []), ]
[docs] def validate(self, domain: str, data: dict[str, Any]) -> list[ValidationViolation]: domain_key = domain.upper() violations: list[ValidationViolation] = [] for field_name in self.required_fields.get(domain_key, []): if is_missing_value(data.get(field_name)): violations.append( ValidationViolation( field=field_name, message=( f"{field_name} is required for {domain_key} in {self.profile_name}." ), severity="ERROR", ) ) for field_name in self.recommended_fields.get(domain_key, []): if is_missing_value(data.get(field_name)): violations.append( ValidationViolation( field=field_name, message=( f"{field_name} is recommended for {domain_key} in {self.profile_name}." ), severity="WARNING", ) ) for constraint_key, allowed_values in self.value_constraints.items(): if "." in constraint_key: constraint_domain, field_name = constraint_key.split(".", 1) if constraint_domain.upper() != domain_key: continue else: field_name = constraint_key value = data.get(field_name) if is_missing_value(value): continue if value not in allowed_values: violations.append( ValidationViolation( field=field_name, message=f"{field_name} must be one of {allowed_values}.", severity="ERROR", ) ) return violations
[docs]class GeneralClinicalProfile(StandardsProfile):
[docs] def __init__(self) -> None: super().__init__( profile_name="general", required_fields={ "AE": ["subjectKey", "aeTerm", "aeSeverity"], "PD": ["subjectKey", "dvTerm", "dvCategory", "dvSeverity", "dvDate"], }, recommended_fields={ "AE": ["aeDecod", "aeRelationship"], "PD": ["dvStatus"], }, optional_fields={ "DD": ["subjectKey", "ddTerm", "ddCategory", "ddDate", "ddSerious"], }, )
[docs]class DrugSafetyProfile(StandardsProfile):
[docs] def __init__(self) -> None: super().__init__( profile_name="drug", required_fields={ "AE": ["subjectKey", "aeTerm", "aeSeverity", "aeDecod", "aeRelationship"], "PD": ["subjectKey", "dvTerm", "dvCategory", "dvSeverity", "dvDate"], }, recommended_fields={"AE": ["aeActionTaken", "aeOutcome"]}, optional_fields={"DD": ["subjectKey", "ddTerm", "ddCategory", "ddDate", "ddSerious"]}, value_constraints={"AE.aeSeverity": [1, 2, 3, 4, 5, "1", "2", "3", "4", "5"]}, )
[docs]class DeviceSafetyProfile(StandardsProfile):
[docs] def __init__(self) -> None: super().__init__( profile_name="device", required_fields={ "AE": ["subjectKey", "aeTerm", "aeSeverity"], "PD": ["subjectKey", "dvTerm", "dvCategory", "dvSeverity", "dvDate"], "DD": ["subjectKey", "ddTerm", "ddCategory", "ddDate", "ddSerious"], }, recommended_fields={"DD": ["ddRelationship", "ddActionTaken"]}, optional_fields={}, )
[docs] def validate(self, domain: str, data: dict[str, Any]) -> list[ValidationViolation]: violations = super().validate(domain=domain, data=data) domain_key = domain.upper() if domain_key != "DD": return violations deficiency_occurred = data.get("ddOccurred") if deficiency_occurred is True and all( is_missing_value(data.get(field_name)) for field_name in ("ddTerm", "ddCategory", "ddDate") ): violations.append( ValidationViolation( field="ddOccurred", message="DD record details are required when ddOccurred is true.", severity="ERROR", ) ) dd_serious = data.get("ddSerious") if not is_missing_value(dd_serious) and not isinstance(dd_serious, bool): violations.append( ValidationViolation( field="ddSerious", message="ddSerious must be a boolean value for device studies.", severity="ERROR", ) ) return violations
[docs]class StandardsProfileRegistry:
[docs] def __init__(self) -> None: self._profiles: dict[str, StandardsProfile] = {}
[docs] def register(self, profile: StandardsProfile) -> None: self._profiles[profile.profile_name] = profile
[docs] def get(self, profile_name: str) -> StandardsProfile: return self._profiles[profile_name]
[docs] def list_profiles(self) -> list[str]: return sorted(self._profiles.keys())
PROFILE_REGISTRY = StandardsProfileRegistry() PROFILE_REGISTRY.register(GeneralClinicalProfile()) PROFILE_REGISTRY.register(DrugSafetyProfile()) PROFILE_REGISTRY.register(DeviceSafetyProfile()) __all__ = [ "ValidationViolation", "StandardsProfile", "GeneralClinicalProfile", "DrugSafetyProfile", "DeviceSafetyProfile", "StandardsProfileRegistry", "PROFILE_REGISTRY", ]