from __future__ import annotations
import re
import sqlite3
import time
import uuid
from contextlib import contextmanager
from datetime import datetime, timezone
from pathlib import Path
from threading import RLock
from typing import Callable, Iterator, Optional
from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit
from imednet.models.triage import TriageAnnotation, TriageHistoryEntry, TriageItem, TriageStatus
_SAFE_SQL_IDENTIFIER = re.compile(r"^[A-Za-z0-9._:-]+$")
_SENSITIVE_QUERY_KEYS = {"api_key", "password", "secret", "security_key", "token"}
_SENSITIVE_PATTERN_KEYS = ("api[_-]?key", "password", "secret", "security[_-]?key", "token")
_SENSITIVE_QUOTED_PATTERN = re.compile(
rf"(?i)\b({'|'.join(_SENSITIVE_PATTERN_KEYS)})\b(\s*[:=]\s*)([\"'])(.*?)\3"
)
_SENSITIVE_UNQUOTED_PATTERN = re.compile(
rf"(?i)\b({'|'.join(_SENSITIVE_PATTERN_KEYS)})\b(\s*[:=]\s*)([^\s,;]+)"
)
_LATEST_SCHEMA_VERSION = 1
_SQLITE_BUSY_TIMEOUT_MS = 30_000
_RETRY_BASE_DELAY_SECONDS = 0.05
[docs]class TriageStore:
"""Thread-safe SQLite-backed triage queue and decision store."""
[docs] def __init__(
self,
db_path: str | Path,
*,
timeout: float = 30.0,
retry_attempts: int = 3,
) -> None:
self.db_path = Path(db_path).expanduser()
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self._timeout = timeout
self._retry_attempts = retry_attempts
self._lock = RLock()
self._initialize_schema()
@contextmanager
def _connection(self) -> Iterator[sqlite3.Connection]:
try:
conn = sqlite3.connect(self.db_path, timeout=self._timeout)
except sqlite3.Error as exc:
raise sqlite3.OperationalError(
f"Unable to open triage store at {self._redact_sqlite_target(str(self.db_path))}: "
f"{self._redact_error_text(str(exc))}"
) from exc
conn.row_factory = sqlite3.Row
conn.execute(f"PRAGMA busy_timeout = {_SQLITE_BUSY_TIMEOUT_MS};")
conn.execute("PRAGMA journal_mode=WAL;")
conn.execute("PRAGMA synchronous=NORMAL;")
conn.execute("PRAGMA foreign_keys=ON;")
try:
yield conn
finally:
conn.close()
def _initialize_schema(self) -> None:
with self._connection() as conn:
current_version = int(conn.execute("PRAGMA user_version").fetchone()[0])
conn.execute("""
CREATE TABLE IF NOT EXISTS triage_items (
item_id TEXT PRIMARY KEY,
study_key TEXT NOT NULL,
status TEXT NOT NULL,
assignee TEXT,
severity TEXT NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS triage_annotations (
annotation_id TEXT PRIMARY KEY,
item_id TEXT NOT NULL,
user_id TEXT NOT NULL,
comment TEXT NOT NULL,
timestamp TEXT NOT NULL,
FOREIGN KEY(item_id) REFERENCES triage_items(item_id) ON DELETE CASCADE
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS triage_history (
transition_id TEXT PRIMARY KEY,
item_id TEXT NOT NULL,
from_status TEXT NOT NULL,
to_status TEXT NOT NULL,
user_id TEXT NOT NULL,
comment TEXT,
timestamp TEXT NOT NULL,
FOREIGN KEY(item_id) REFERENCES triage_items(item_id) ON DELETE CASCADE
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_triage_items_study_status
ON triage_items(study_key, status)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_triage_annotations_item
ON triage_annotations(item_id, timestamp)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_triage_history_item
ON triage_history(item_id, timestamp)
""")
if current_version < _LATEST_SCHEMA_VERSION:
self._migrate_schema(conn)
conn.execute(f"PRAGMA user_version = {_LATEST_SCHEMA_VERSION}")
conn.commit()
[docs] def get_journal_mode(self) -> str:
with self._connection() as conn:
value = conn.execute("PRAGMA journal_mode").fetchone()[0]
return str(value)
def _execute_write(self, callback: Callable[[sqlite3.Connection], None]) -> None:
last_error: sqlite3.OperationalError | None = None
for attempt in range(self._retry_attempts):
try:
with self._lock:
with self._connection() as conn:
with conn:
callback(conn)
return
except sqlite3.OperationalError as exc:
last_error = exc
if attempt < self._retry_attempts - 1:
time.sleep(_RETRY_BASE_DELAY_SECONDS * (attempt + 1))
if last_error is not None:
raise sqlite3.OperationalError(
f"SQLite write failed for triage store at "
f"{self._redact_sqlite_target(str(self.db_path))}: "
f"{self._redact_error_text(str(last_error))}"
) from last_error
[docs] def upsert_item(self, item: TriageItem) -> None:
now = datetime.now(timezone.utc).isoformat()
def _write(conn: sqlite3.Connection) -> None:
conn.execute(
"""
INSERT INTO triage_items (
item_id, study_key, status, assignee, severity, created_at, updated_at
)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(item_id) DO UPDATE SET
study_key=excluded.study_key,
status=excluded.status,
assignee=excluded.assignee,
severity=excluded.severity,
updated_at=excluded.updated_at
""",
(
item.item_id,
item.study_key,
item.status.value,
item.assignee,
item.severity,
now,
now,
),
)
conn.execute("DELETE FROM triage_annotations WHERE item_id = ?", (item.item_id,))
conn.executemany(
"""
INSERT INTO triage_annotations (annotation_id, item_id, user_id, comment, timestamp)
VALUES (?, ?, ?, ?, ?)
""",
[
(
annotation.annotation_id,
item.item_id,
annotation.user_id,
annotation.comment,
annotation.timestamp.isoformat(),
)
for annotation in item.annotations
],
)
conn.execute("DELETE FROM triage_history WHERE item_id = ?", (item.item_id,))
conn.executemany(
"""
INSERT INTO triage_history (
transition_id, item_id, from_status, to_status, user_id, comment, timestamp
)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
[
(
entry.transition_id,
item.item_id,
entry.from_status.value,
entry.to_status.value,
entry.user_id,
entry.comment,
entry.timestamp.isoformat(),
)
for entry in item.history
],
)
self._execute_write(_write)
[docs] def get_triage_item(self, item_id: str) -> Optional[TriageItem]:
with self._connection() as conn:
row = conn.execute(
"""
SELECT item_id, study_key, status, assignee, severity
FROM triage_items
WHERE item_id = ?
""",
(item_id,),
).fetchone()
if row is None:
return None
annotations = self._get_annotations(conn, item_id)
history = self._get_history(conn, item_id)
return TriageItem(
item_id=str(row["item_id"]),
study_key=str(row["study_key"]),
status=TriageStatus(str(row["status"])),
assignee=str(row["assignee"]).strip() if row["assignee"] else None,
severity=str(row["severity"]),
annotations=annotations,
history=history,
)
[docs] def get_queue(self, study_key: str, status: Optional[TriageStatus] = None) -> list[TriageItem]:
with self._connection() as conn:
if status is None:
rows = conn.execute(
"""
SELECT item_id, study_key, status, assignee, severity
FROM triage_items
WHERE study_key = ?
ORDER BY updated_at ASC
""",
(study_key,),
).fetchall()
else:
rows = conn.execute(
"""
SELECT item_id, study_key, status, assignee, severity
FROM triage_items
WHERE study_key = ? AND status = ?
ORDER BY updated_at ASC
""",
(study_key, status.value),
).fetchall()
item_ids = [str(row["item_id"]) for row in rows]
if not item_ids:
return []
invalid_item_id = next(
(item_id for item_id in item_ids if not _SAFE_SQL_IDENTIFIER.fullmatch(item_id)),
None,
)
if invalid_item_id is not None:
raise ValueError(
f"Invalid triage item identifier: {invalid_item_id}. "
"Only alphanumeric characters and ._:- are allowed."
)
conn.execute("DROP TABLE IF EXISTS temp_item_ids")
conn.execute("CREATE TEMP TABLE temp_item_ids (item_id TEXT PRIMARY KEY)")
conn.executemany(
"INSERT INTO temp_item_ids (item_id) VALUES (?)",
[(item_id,) for item_id in item_ids],
)
annotation_rows = conn.execute(
"""
SELECT item_id, annotation_id, user_id, comment, timestamp
FROM triage_annotations
WHERE item_id IN (SELECT item_id FROM temp_item_ids)
ORDER BY timestamp ASC
""",
).fetchall()
history_rows = conn.execute(
"""
SELECT item_id, transition_id, from_status, to_status, user_id, comment, timestamp
FROM triage_history
WHERE item_id IN (SELECT item_id FROM temp_item_ids)
ORDER BY timestamp ASC
""",
).fetchall()
annotations_by_item: dict[str, list[TriageAnnotation]] = {
item_id: [] for item_id in item_ids
}
for annotation_row in annotation_rows:
item_id = str(annotation_row["item_id"])
annotations_by_item[item_id].append(
TriageAnnotation(
annotation_id=str(annotation_row["annotation_id"]),
user_id=str(annotation_row["user_id"]),
comment=str(annotation_row["comment"]),
timestamp=datetime.fromisoformat(str(annotation_row["timestamp"])),
)
)
history_by_item: dict[str, list[TriageHistoryEntry]] = {
item_id: [] for item_id in item_ids
}
for history_row in history_rows:
item_id = str(history_row["item_id"])
history_by_item[item_id].append(
TriageHistoryEntry(
transition_id=str(history_row["transition_id"]),
from_status=TriageStatus(str(history_row["from_status"])),
to_status=TriageStatus(str(history_row["to_status"])),
user_id=str(history_row["user_id"]),
comment=(
str(history_row["comment"]).strip() if history_row["comment"] else None
),
timestamp=datetime.fromisoformat(str(history_row["timestamp"])),
)
)
return [
TriageItem(
item_id=str(row["item_id"]),
study_key=str(row["study_key"]),
status=TriageStatus(str(row["status"])),
assignee=str(row["assignee"]).strip() if row["assignee"] else None,
severity=str(row["severity"]),
annotations=annotations_by_item.get(str(row["item_id"]), []),
history=history_by_item.get(str(row["item_id"]), []),
)
for row in rows
]
[docs] def assign_item(self, item_id: str, assignee: str) -> None:
def _write(conn: sqlite3.Connection) -> None:
cursor = conn.execute(
"""
UPDATE triage_items
SET assignee = ?, updated_at = ?
WHERE item_id = ?
""",
(assignee.strip(), datetime.now(timezone.utc).isoformat(), item_id),
)
if cursor.rowcount == 0:
raise ValueError(f"Unknown triage item: {item_id}")
self._execute_write(_write)
[docs] def update_status(
self,
item_id: str,
to_status: TriageStatus,
user_id: str,
comment: Optional[str],
) -> None:
normalized_comment = comment.strip() if comment else None
timestamp = datetime.now(timezone.utc).isoformat()
def _write(conn: sqlite3.Connection) -> None:
row = conn.execute(
"SELECT status FROM triage_items WHERE item_id = ?",
(item_id,),
).fetchone()
if row is None:
raise ValueError(f"Unknown triage item: {item_id}")
from_status = TriageStatus(str(row["status"]))
conn.execute(
"""
UPDATE triage_items
SET status = ?, updated_at = ?
WHERE item_id = ?
""",
(to_status.value, timestamp, item_id),
)
conn.execute(
"""
INSERT INTO triage_history (
transition_id, item_id, from_status, to_status, user_id, comment, timestamp
)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
str(uuid.uuid4()),
item_id,
from_status.value,
to_status.value,
user_id,
normalized_comment,
timestamp,
),
)
self._execute_write(_write)
[docs] def add_annotation(self, item_id: str, user_id: str, comment: str) -> None:
cleaned_comment = comment.strip()
if not cleaned_comment:
raise ValueError("Annotation comment must not be empty")
timestamp = datetime.now(timezone.utc).isoformat()
def _write(conn: sqlite3.Connection) -> None:
row = conn.execute(
"SELECT item_id FROM triage_items WHERE item_id = ?",
(item_id,),
).fetchone()
if row is None:
raise ValueError(f"Unknown triage item: {item_id}")
conn.execute(
"""
INSERT INTO triage_annotations (annotation_id, item_id, user_id, comment, timestamp)
VALUES (?, ?, ?, ?, ?)
""",
(str(uuid.uuid4()), item_id, user_id, cleaned_comment, timestamp),
)
conn.execute(
"UPDATE triage_items SET updated_at = ? WHERE item_id = ?",
(timestamp, item_id),
)
self._execute_write(_write)
[docs] def get_item_last_updated(self, item_id: str) -> Optional[datetime]:
with self._connection() as conn:
row = conn.execute(
"SELECT updated_at FROM triage_items WHERE item_id = ?",
(item_id,),
).fetchone()
if row is None or row["updated_at"] is None:
return None
return datetime.fromisoformat(str(row["updated_at"]))
def _get_annotations(self, conn: sqlite3.Connection, item_id: str) -> list[TriageAnnotation]:
rows = conn.execute(
"""
SELECT annotation_id, user_id, comment, timestamp
FROM triage_annotations
WHERE item_id = ?
ORDER BY timestamp ASC
""",
(item_id,),
).fetchall()
return [
TriageAnnotation(
annotation_id=str(row["annotation_id"]),
user_id=str(row["user_id"]),
comment=str(row["comment"]),
timestamp=datetime.fromisoformat(str(row["timestamp"])),
)
for row in rows
]
def _get_history(self, conn: sqlite3.Connection, item_id: str) -> list[TriageHistoryEntry]:
rows = conn.execute(
"""
SELECT transition_id, from_status, to_status, user_id, comment, timestamp
FROM triage_history
WHERE item_id = ?
ORDER BY timestamp ASC
""",
(item_id,),
).fetchall()
return [
TriageHistoryEntry(
transition_id=str(row["transition_id"]),
from_status=TriageStatus(str(row["from_status"])),
to_status=TriageStatus(str(row["to_status"])),
user_id=str(row["user_id"]),
comment=str(row["comment"]).strip() if row["comment"] else None,
timestamp=datetime.fromisoformat(str(row["timestamp"])),
)
for row in rows
]
def _migrate_schema(self, conn: sqlite3.Connection) -> None:
columns = {
str(row["name"]) for row in conn.execute("PRAGMA table_info(triage_items)").fetchall()
}
now = datetime.now(timezone.utc).isoformat()
if "created_at" not in columns:
conn.execute("ALTER TABLE triage_items ADD COLUMN created_at TEXT")
conn.execute(
"""
UPDATE triage_items
SET created_at = ?
WHERE created_at IS NULL
""",
(now,),
)
if "updated_at" not in columns:
conn.execute("ALTER TABLE triage_items ADD COLUMN updated_at TEXT")
conn.execute(
"""
UPDATE triage_items
SET updated_at = ?
WHERE updated_at IS NULL
""",
(now,),
)
def _redact_sqlite_target(self, target: str) -> str:
if "://" not in target and not target.startswith("file:"):
return target
split = urlsplit(target)
netloc = split.netloc
if "@" in netloc:
_, host = netloc.rsplit("@", 1)
netloc = f"***@{host}"
query_parts = parse_qsl(split.query, keep_blank_values=True)
redacted_query = urlencode(
[
(key, "***" if key.lower() in _SENSITIVE_QUERY_KEYS else value)
for key, value in query_parts
],
doseq=True,
)
return urlunsplit((split.scheme, netloc, split.path, redacted_query, split.fragment))
def _redact_error_text(self, message: str) -> str:
redacted = _SENSITIVE_QUOTED_PATTERN.sub(
lambda match: f"{match.group(1)}{match.group(2)}***",
message,
)
return _SENSITIVE_UNQUOTED_PATTERN.sub(
lambda match: f"{match.group(1)}{match.group(2)}***",
redacted,
)