Source code for aiohttp_client_cache.backends.gridfs

import pickle
from typing import Iterable

from gridfs import GridFS
from pymongo import MongoClient

from aiohttp_client_cache.backends import BaseCache, CacheBackend, ResponseOrKey
from aiohttp_client_cache.backends.mongo import MongoDBCache
from aiohttp_client_cache.forge_utils import extend_signature


[docs]class GridFSBackend(CacheBackend): """An async-compatible interface for caching objects in MongoDB GridFS. Use this if you need to support documents greater than 16MB. Args: connection: Optional client object to use instead of creating a new one See :py:class:`.CacheBackend` for additional args. """ @extend_signature(CacheBackend.__init__) def __init__(self, cache_name: str = 'aiohttp-cache', connection: MongoClient = None, **kwargs): super().__init__(cache_name=cache_name, **kwargs) self.responses = GridFSCache(cache_name, connection) self.keys_map = MongoDBCache(cache_name, 'redirects', self.responses.connection)
# TODO: Incomplete/untested # TODO: Fully async implementation. Current implementation uses blocking operations. # Methods are currently defined as async only for compatibility with BaseCache API.
[docs]class GridFSCache(BaseCache): """A dictionary-like interface for MongoDB GridFS Args: db_name: database name (be careful with production databases) connection: MongoDB connection instance to use instead of creating a new one """ def __init__(self, db_name, connection: MongoClient = None): self.connection = connection or MongoClient() self.db = self.connection[db_name] self.fs = GridFS(self.db) # TODO
[docs] async def contains(self, key: str) -> bool: raise NotImplementedError
[docs] async def clear(self): self.db['fs.files'].drop() self.db['fs.chunks'].drop()
[docs] async def delete(self, key: str): res = self.fs.find_one({'_id': key}) if res is not None: self.fs.delete(res._id)
[docs] async def keys(self) -> Iterable[str]: return [d._id for d in self.fs.find()]
[docs] async def read(self, key: str) -> ResponseOrKey: result = self.fs.find_one({'_id': key}) if result is None: raise KeyError return self.unpickle(bytes(result.read()))
[docs] async def size(self) -> int: return self.db['fs.files'].count()
# TODO
[docs] async def values(self) -> Iterable[ResponseOrKey]: raise NotImplementedError
[docs] async def write(self, key: str, item: ResponseOrKey): await self.delete(key) self.fs.put(pickle.dumps(item, protocol=-1), **{'_id': key})