Source code for aiohttp_client_cache.response

import json
from datetime import datetime, timedelta
from http.cookies import SimpleCookie
from typing import Any, Dict, Iterable, Mapping, Optional, Union

import attr
from aiohttp import ClientResponse, ClientResponseError
from aiohttp.client_reqrep import ContentDisposition
from aiohttp.typedefs import StrOrURL

# CachedResponse attributes to not copy directly from ClientResponse
EXCLUDE_ATTRS = {
    '_body',
    'created_at',
    'encoding',
    'expires',
    'history',
    'is_expired',
    'request_info',
}
JsonResponse = Optional[Dict[str, Any]]


[docs]@attr.s(auto_attribs=True, slots=True) class RequestInfo: """A picklable version of aiohttp.client_reqrep.RequestInfo""" url: str method: str headers: dict real_url: str
[docs] @classmethod def from_object(cls, request_info): return cls( url=str(request_info.url), method=request_info.method, headers=dict(request_info.headers), real_url=str(request_info.real_url), )
[docs]@attr.s(slots=True) class CachedResponse: """A dataclass containing cached response information, used for serialization. It will mostly behave the same as a :py:class:`aiohttp.ClientResponse` that has been read, with some additional cache-related info. """ method: str = attr.ib() reason: str = attr.ib() status: int = attr.ib() url: StrOrURL = attr.ib() version: str = attr.ib() _body: Any = attr.ib(default=None) content_disposition: ContentDisposition = attr.ib(default=None) cookies: SimpleCookie = attr.ib(default=None) created_at: datetime = attr.ib(factory=datetime.utcnow) encoding: str = attr.ib(default=None) expires: datetime = attr.ib(default=None) headers: Mapping = attr.ib(factory=dict) history: Iterable = attr.ib(factory=tuple) request_info: RequestInfo = attr.ib(default=None)
[docs] @classmethod async def from_client_response( cls, client_response: ClientResponse, expire_after: timedelta = None ): """Convert a ClientResponse into a CachedReponse""" # Response may not have been read yet, if fetched by something other than CachedSession if not client_response._released: await client_response.read() # Copy most attributes over as is copy_attrs = set(attr.fields_dict(cls).keys()) - EXCLUDE_ATTRS response = cls(**{k: getattr(client_response, k) for k in copy_attrs}) # Set some remaining attributes individually response._body = client_response._body response.headers = dict(client_response.headers) # Set expiration time if expire_after: response.expires = datetime.utcnow() + expire_after # The encoding may be unset even if the response has been read try: response.encoding = client_response.get_encoding() except RuntimeError: pass response.request_info = RequestInfo.from_object(client_response.request_info) response.url = str(client_response.url) if client_response.history: response.history = ( *[await cls.from_client_response(r) for r in client_response.history], ) return response
@property def ok(self) -> bool: """Returns ``True`` if ``status`` is less than ``400``, ``False`` if not""" try: self.raise_for_status() return True except ClientResponseError: return False
[docs] def get_encoding(self): return self.encoding
@property def is_expired(self) -> bool: """Determine if this cached response is expired""" return bool(self.expires) and datetime.utcnow() > self.expires
[docs] async def json(self, encoding: Optional[str] = None, **kwargs) -> Optional[Dict[str, Any]]: """Read and decode JSON response""" stripped = self._body.strip() if not stripped: return None return json.loads(stripped.decode(encoding or self.encoding))
[docs] def raise_for_status(self) -> None: if self.status >= 400: raise ClientResponseError( self.request_info, # type: ignore # These types are interchangeable tuple(), status=self.status, message=self.reason, headers=self.headers, )
[docs] def read(self): """No-op function for compatibility with ClientResponse"""
[docs] def release(self): """No-op function for compatibility with ClientResponse"""
[docs] async def text(self, encoding: Optional[str] = None, errors: str = "strict") -> str: """Read response payload and decode""" return self._body.decode(encoding or self.encoding, errors=errors)
AnyResponse = Union[ClientResponse, CachedResponse]