Source code for imednet.core.http.executor

"""
HTTP request execution with retries and monitoring.
"""

from __future__ import annotations

from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Optional

import httpx
from tenacity import (
    AsyncRetrying,
    RetryCallState,
    RetryError,
    Retrying,
    stop_after_attempt,
    wait_exponential,
)

from imednet.core.http.handlers import handle_response
from imednet.core.http.monitor import RequestMonitor
from imednet.core.retry import DefaultRetryPolicy, RetryPolicy, RetryState

if TYPE_CHECKING:
    from opentelemetry.trace import Tracer
else:
    Tracer = Any


[docs]class BaseRequestExecutor(ABC): """Abstract base for request executors.""" def __init__( self, send: Any, retries: int, backoff_factor: float, tracer: Optional[Tracer] = None, retry_policy: RetryPolicy | None = None, ) -> None: self.send = send self.retries = retries self.backoff_factor = backoff_factor self.tracer = tracer self.retry_policy = retry_policy or DefaultRetryPolicy() def _get_retry_predicate(self, method: str) -> Callable[[RetryCallState], bool]: """Return a retry predicate that includes the HTTP method in state.""" policy = self.retry_policy def should_retry(retry_state: RetryCallState) -> bool: state = RetryState( attempt_number=retry_state.attempt_number, exception=( retry_state.outcome.exception() if retry_state.outcome and retry_state.outcome.failed else None ), result=( retry_state.outcome.result() if retry_state.outcome and not retry_state.outcome.failed else None ), method=method, ) return policy.should_retry(state) return should_retry def _process_result( self, response: Optional[httpx.Response], monitor: RequestMonitor ) -> httpx.Response: """Process successful response or raise error if None.""" if response is not None: monitor.on_success(response) return handle_response(response) raise RuntimeError("Request failed without response or exception") def _process_retry_error(self, e: RetryError, monitor: RequestMonitor) -> httpx.Response: """Handle RetryError, extracting successful result if present, else escalate.""" if e.last_attempt and not e.last_attempt.failed: response: httpx.Response = e.last_attempt.result() monitor.on_success(response) return handle_response(response) monitor.on_retry_error(e) raise RuntimeError("Request failed without response or exception") # Unreachable @abstractmethod def __call__(self, method: str, url: str, **kwargs: Any) -> Any: """Execute the request."""
[docs]class SyncRequestExecutor(BaseRequestExecutor): """Execute synchronous HTTP requests with retry and error handling.""" def __init__( self, send: Callable[..., httpx.Response], retries: int, backoff_factor: float, tracer: Optional[Tracer] = None, retry_policy: RetryPolicy | None = None, ) -> None: super().__init__(send, retries, backoff_factor, tracer, retry_policy) # self.send is set in super def __call__(self, method: str, url: str, **kwargs: Any) -> httpx.Response: def send_fn() -> httpx.Response: return self.send(method, url, **kwargs) retryer = Retrying( stop=stop_after_attempt(self.retries), wait=wait_exponential(multiplier=self.backoff_factor), retry=self._get_retry_predicate(method), reraise=False, ) with RequestMonitor(self.tracer, method, url) as monitor: try: response: Optional[httpx.Response] = retryer(send_fn) return self._process_result(response, monitor) except RetryError as e: return self._process_retry_error(e, monitor)
[docs]class AsyncRequestExecutor(BaseRequestExecutor): """Execute asynchronous HTTP requests with retry and error handling.""" def __init__( self, send: Callable[..., Awaitable[httpx.Response]], retries: int, backoff_factor: float, tracer: Optional[Tracer] = None, retry_policy: RetryPolicy | None = None, ) -> None: super().__init__(send, retries, backoff_factor, tracer, retry_policy) # self.send is set in super async def __call__(self, method: str, url: str, **kwargs: Any) -> httpx.Response: async def send_fn() -> httpx.Response: return await self.send(method, url, **kwargs) retryer = AsyncRetrying( stop=stop_after_attempt(self.retries), wait=wait_exponential(multiplier=self.backoff_factor), retry=self._get_retry_predicate(method), reraise=False, ) async with RequestMonitor(self.tracer, method, url) as monitor: try: response: Optional[httpx.Response] = await retryer(send_fn) return self._process_result(response, monitor) except RetryError as e: return self._process_retry_error(e, monitor)