"""
HTTP request execution with retries and monitoring.
"""
from __future__ import annotations
import logging
from abc import ABC, abstractmethod
from contextlib import contextmanager
from datetime import datetime, timezone
from email.utils import parsedate_to_datetime
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Iterator, Optional
import httpx
from tenacity import (
AsyncRetrying,
RetryCallState,
RetryError,
Retrying,
stop_after_attempt,
wait_random_exponential,
)
from imednet.core.http.handlers import handle_response
from imednet.core.http.monitor import RequestMonitor
from imednet.core.retry import DefaultRetryPolicy, RetryPolicy, RetryState
_SUPPRESSED_LOG_LEVEL = logging.CRITICAL + 1
if TYPE_CHECKING:
from opentelemetry.trace import Tracer
else:
Tracer = Any
[docs]class BaseRequestExecutor(ABC):
"""Abstract base for request executors."""
[docs] def __init__(
self,
send: Any,
retries: int,
backoff_factor: float,
tracer: Optional[Tracer] = None,
retry_policy: RetryPolicy | None = None,
) -> None:
self.send = send
self.retries = retries
self.backoff_factor = backoff_factor
self.tracer = tracer
self.retry_policy = retry_policy or DefaultRetryPolicy()
self._jitter_wait = wait_random_exponential(multiplier=self.backoff_factor)
@staticmethod
@contextmanager
def _suppress_httpx_request_logging() -> Iterator[None]:
loggers = {
"httpx": logging.getLogger("httpx"),
"httpcore": logging.getLogger("httpcore"),
}
logger_states = {name: logger.level for name, logger in loggers.items()}
for logger in loggers.values():
logger.setLevel(_SUPPRESSED_LOG_LEVEL)
try:
yield
finally:
for name, original_level in logger_states.items():
loggers[name].setLevel(original_level)
def _get_retry_predicate(self, method: str) -> Callable[[RetryCallState], bool]:
"""Return a retry predicate that includes the HTTP method in state."""
policy = self.retry_policy
def should_retry(retry_state: RetryCallState) -> bool:
state = RetryState(
attempt_number=retry_state.attempt_number,
exception=(
retry_state.outcome.exception()
if retry_state.outcome and retry_state.outcome.failed
else None
),
result=(
retry_state.outcome.result()
if retry_state.outcome and not retry_state.outcome.failed
else None
),
method=method,
)
return policy.should_retry(state)
return should_retry
def _process_result(
self, response: Optional[httpx.Response], monitor: RequestMonitor
) -> httpx.Response:
"""Process successful response or raise error if None."""
if response is not None:
monitor.on_success(response)
return handle_response(response)
raise RuntimeError("Request failed without response or exception")
def _process_retry_error(self, e: RetryError, monitor: RequestMonitor) -> httpx.Response:
"""Handle RetryError, extracting successful result if present, else escalate."""
if e.last_attempt and not e.last_attempt.failed:
response: httpx.Response = e.last_attempt.result()
monitor.on_success(response)
return handle_response(response)
monitor.on_retry_error(e)
raise RuntimeError("Request failed without response or exception") # Unreachable
@staticmethod
def _parse_retry_after_seconds(response: httpx.Response) -> Optional[float]:
value = response.headers.get("Retry-After")
if not value:
return None
try:
delay = float(value)
return max(delay, 0.0)
except ValueError:
pass
try:
retry_time = parsedate_to_datetime(value)
if retry_time.tzinfo is None:
retry_time = retry_time.replace(tzinfo=timezone.utc)
delay = (retry_time - datetime.now(timezone.utc)).total_seconds()
return max(delay, 0.0)
except (TypeError, ValueError, OverflowError):
return None
def _wait_strategy(self, retry_state: RetryCallState) -> float:
if retry_state.outcome and not retry_state.outcome.failed:
result = retry_state.outcome.result()
if isinstance(result, httpx.Response):
retry_after_seconds = self._parse_retry_after_seconds(result)
if retry_after_seconds is not None:
return retry_after_seconds
return float(self._jitter_wait(retry_state))
@abstractmethod
def __call__(self, method: str, url: str, **kwargs: Any) -> Any:
"""Execute the request."""
[docs]class SyncRequestExecutor(BaseRequestExecutor):
"""Execute synchronous HTTP requests with retry and error handling."""
[docs] def __init__(
self,
send: Callable[..., httpx.Response],
retries: int,
backoff_factor: float,
tracer: Optional[Tracer] = None,
retry_policy: RetryPolicy | None = None,
) -> None:
super().__init__(send, retries, backoff_factor, tracer, retry_policy)
# self.send is set in super
def __call__(self, method: str, url: str, **kwargs: Any) -> httpx.Response:
def send_fn() -> httpx.Response:
with self._suppress_httpx_request_logging():
return self.send(method, url, **kwargs)
retryer = Retrying(
stop=stop_after_attempt(self.retries),
wait=self._wait_strategy,
retry=self._get_retry_predicate(method),
reraise=False,
)
with RequestMonitor(self.tracer, method, url) as monitor:
try:
response: Optional[httpx.Response] = retryer(send_fn)
return self._process_result(response, monitor)
except RetryError as e:
return self._process_retry_error(e, monitor)
[docs]class AsyncRequestExecutor(BaseRequestExecutor):
"""Execute asynchronous HTTP requests with retry and error handling."""
[docs] def __init__(
self,
send: Callable[..., Awaitable[httpx.Response]],
retries: int,
backoff_factor: float,
tracer: Optional[Tracer] = None,
retry_policy: RetryPolicy | None = None,
) -> None:
super().__init__(send, retries, backoff_factor, tracer, retry_policy)
# self.send is set in super
async def __call__(self, method: str, url: str, **kwargs: Any) -> httpx.Response:
async def send_fn() -> httpx.Response:
with self._suppress_httpx_request_logging():
return await self.send(method, url, **kwargs)
retryer = AsyncRetrying(
stop=stop_after_attempt(self.retries),
wait=self._wait_strategy,
retry=self._get_retry_predicate(method),
reraise=False,
)
async with RequestMonitor(self.tracer, method, url) as monitor:
try:
response: Optional[httpx.Response] = await retryer(send_fn)
return self._process_result(response, monitor)
except RetryError as e:
return self._process_retry_error(e, monitor)