Source code for aiohttp_client_cache.backends.mongo

import pickle
from typing import Iterable

from motor.motor_asyncio import AsyncIOMotorClient

from aiohttp_client_cache.backends import BaseCache, CacheBackend, ResponseOrKey
from aiohttp_client_cache.forge_utils import extend_signature


[docs]class MongoDBBackend(CacheBackend): """MongoDB cache backend Args: connection: Optional client object to use instead of creating a new one See :py:class:`.CacheBackend` for additional args. """ @extend_signature(CacheBackend.__init__) def __init__( self, cache_name: str = 'aiohttp-cache', connection: AsyncIOMotorClient = None, **kwargs ): super().__init__(cache_name=cache_name, **kwargs) self.responses = MongoDBPickleCache(cache_name, 'responses', connection) self.keys_map = MongoDBCache(cache_name, 'redirects', self.responses.connection)
[docs]class MongoDBCache(BaseCache): """An async-compatible interface for caching objects in MongoDB Args: db_name: database name (be careful with production databases) collection_name: collection name connection: MongoDB connection instance to use instead of creating a new one """ def __init__(self, db_name, collection_name: str, connection: AsyncIOMotorClient = None): self.connection = connection or AsyncIOMotorClient() self.db = self.connection[db_name] self.collection = self.db[collection_name]
[docs] async def clear(self): await self.collection.drop()
[docs] async def contains(self, key: str) -> bool: return bool(await self.collection.find_one({'_id': key}))
[docs] async def delete(self, key: str): spec = {'_id': key} if hasattr(self.collection, "find_one_and_delete"): await self.collection.find_one_and_delete(spec, {'_id': True}) else: await self.collection.find_and_modify(spec, remove=True, fields={'_id': True})
[docs] async def keys(self) -> Iterable[str]: return [d['_id'] for d in await self.collection.find({}, {'_id': True}).to_list(None)]
[docs] async def read(self, key: str) -> ResponseOrKey: result = await self.collection.find_one({'_id': key}) return result['data'] if result else None
[docs] async def size(self) -> int: return await self.collection.count_documents({})
[docs] async def values(self) -> Iterable[ResponseOrKey]: results = await self.collection.find({'data': {'$exists': True}}).to_list(None) return [x['data'] for x in results]
[docs] async def write(self, key: str, item: ResponseOrKey): doc = {'_id': key, 'data': item} await self.collection.replace_one({'_id': key}, doc, upsert=True)
[docs]class MongoDBPickleCache(MongoDBCache): """Same as :py:class:`MongoDBCache`, but pickles values before saving"""
[docs] async def read(self, key): return self.unpickle(bytes(await super().read(key)))
[docs] async def write(self, key, item): await super().write(key, pickle.dumps(item, protocol=-1))