"""
HTTP request execution with retries and monitoring.
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Optional
import httpx
from tenacity import (
AsyncRetrying,
RetryCallState,
RetryError,
Retrying,
stop_after_attempt,
wait_exponential,
)
from imednet.core.http.handlers import handle_response
from imednet.core.http.monitor import RequestMonitor
from imednet.core.retry import DefaultRetryPolicy, RetryPolicy, RetryState
if TYPE_CHECKING:
from opentelemetry.trace import Tracer
else:
Tracer = Any
[docs]class BaseRequestExecutor(ABC):
"""Abstract base for request executors."""
def __init__(
self,
send: Any,
retries: int,
backoff_factor: float,
tracer: Optional[Tracer] = None,
retry_policy: RetryPolicy | None = None,
) -> None:
self.send = send
self.retries = retries
self.backoff_factor = backoff_factor
self.tracer = tracer
self.retry_policy = retry_policy or DefaultRetryPolicy()
def _get_retry_predicate(self, method: str) -> Callable[[RetryCallState], bool]:
"""Return a retry predicate that includes the HTTP method in state."""
policy = self.retry_policy
def should_retry(retry_state: RetryCallState) -> bool:
state = RetryState(
attempt_number=retry_state.attempt_number,
exception=(
retry_state.outcome.exception()
if retry_state.outcome and retry_state.outcome.failed
else None
),
result=(
retry_state.outcome.result()
if retry_state.outcome and not retry_state.outcome.failed
else None
),
method=method,
)
return policy.should_retry(state)
return should_retry
def _process_result(
self, response: Optional[httpx.Response], monitor: RequestMonitor
) -> httpx.Response:
"""Process successful response or raise error if None."""
if response is not None:
monitor.on_success(response)
return handle_response(response)
raise RuntimeError("Request failed without response or exception")
def _process_retry_error(self, e: RetryError, monitor: RequestMonitor) -> httpx.Response:
"""Handle RetryError, extracting successful result if present, else escalate."""
if e.last_attempt and not e.last_attempt.failed:
response: httpx.Response = e.last_attempt.result()
monitor.on_success(response)
return handle_response(response)
monitor.on_retry_error(e)
raise RuntimeError("Request failed without response or exception") # Unreachable
@abstractmethod
def __call__(self, method: str, url: str, **kwargs: Any) -> Any:
"""Execute the request."""
[docs]class SyncRequestExecutor(BaseRequestExecutor):
"""Execute synchronous HTTP requests with retry and error handling."""
def __init__(
self,
send: Callable[..., httpx.Response],
retries: int,
backoff_factor: float,
tracer: Optional[Tracer] = None,
retry_policy: RetryPolicy | None = None,
) -> None:
super().__init__(send, retries, backoff_factor, tracer, retry_policy)
# self.send is set in super
def __call__(self, method: str, url: str, **kwargs: Any) -> httpx.Response:
def send_fn() -> httpx.Response:
return self.send(method, url, **kwargs)
retryer = Retrying(
stop=stop_after_attempt(self.retries),
wait=wait_exponential(multiplier=self.backoff_factor),
retry=self._get_retry_predicate(method),
reraise=False,
)
with RequestMonitor(self.tracer, method, url) as monitor:
try:
response: Optional[httpx.Response] = retryer(send_fn)
return self._process_result(response, monitor)
except RetryError as e:
return self._process_retry_error(e, monitor)
[docs]class AsyncRequestExecutor(BaseRequestExecutor):
"""Execute asynchronous HTTP requests with retry and error handling."""
def __init__(
self,
send: Callable[..., Awaitable[httpx.Response]],
retries: int,
backoff_factor: float,
tracer: Optional[Tracer] = None,
retry_policy: RetryPolicy | None = None,
) -> None:
super().__init__(send, retries, backoff_factor, tracer, retry_policy)
# self.send is set in super
async def __call__(self, method: str, url: str, **kwargs: Any) -> httpx.Response:
async def send_fn() -> httpx.Response:
return await self.send(method, url, **kwargs)
retryer = AsyncRetrying(
stop=stop_after_attempt(self.retries),
wait=wait_exponential(multiplier=self.backoff_factor),
retry=self._get_retry_predicate(method),
reraise=False,
)
async with RequestMonitor(self.tracer, method, url) as monitor:
try:
response: Optional[httpx.Response] = await retryer(send_fn)
return self._process_result(response, monitor)
except RetryError as e:
return self._process_retry_error(e, monitor)