Source code for aiohttp_client_cache.backends.sqlite

import asyncio
import pickle
import sqlite3
from contextlib import asynccontextmanager
from os.path import splitext
from typing import AsyncIterator, Iterable, Union

import aiosqlite

from aiohttp_client_cache.backends import BaseCache, CacheBackend, ResponseOrKey
from aiohttp_client_cache.forge_utils import extend_signature


[docs]class SQLiteBackend(CacheBackend): """An async SQLite cache backend. Reading is fast, saving is a bit slower. It can store a large amount of data with low memory usage. The path to the database file will be ``<cache_name>.sqlite``, or just ``<cache_name>`` if a a different file extension is specified. Args: cache_name: Database filename See :py:class:`.CacheBackend` for additional args. """ @extend_signature(CacheBackend.__init__) def __init__(self, cache_name: str = 'aiohttp-cache', **kwargs): super().__init__(cache_name=cache_name, **kwargs) path, ext = splitext(cache_name) cache_path = f'{path}{ext or ".sqlite"}' self.responses = SQLitePickleCache(cache_path, 'responses') self.redirects = SQLiteCache(cache_path, 'redirects')
[docs]class SQLiteCache(BaseCache): """An async interface for caching objects in a SQLite database. Example: >>> # Store data in two tables under the 'testdb' database >>> d1 = SQLiteCache('testdb', 'table1') >>> d2 = SQLiteCache('testdb', 'table2') Args: filename: filename for database (without extension) table_name: table name """ def __init__(self, filename: str, table_name: str): self.filename = filename self.table_name = table_name self._can_commit = True # Transactions can be committed if this is set to `True` self._bulk_commit = False self._initialized = False self._connection = None self._lock = asyncio.Lock()
[docs] @asynccontextmanager async def get_connection(self, autocommit: bool = False) -> AsyncIterator[aiosqlite.Connection]: async with self._lock: db = self._connection if self._connection else await aiosqlite.connect(self.filename) try: yield await self._init_db(db) if autocommit and self._can_commit: await db.commit() finally: if not self._bulk_commit: await db.close()
[docs] async def _init_db(self, db: aiosqlite.Connection): """Create table if this is the first connection opened, and set fast save if possible""" if not self._bulk_commit: await db.execute('PRAGMA synchronous = 0;') if not self._initialized: await db.execute( f'CREATE TABLE IF NOT EXISTS `{self.table_name}` (key PRIMARY KEY, value)' ) self._initialized = True return db
[docs] @asynccontextmanager async def bulk_commit(self): """ Context manager used to speedup insertion of big number of records Example: >>> cache = SQLiteCache('test') >>> async with cache.bulk_commit(): ... for i in range(1000): ... await cache.write(f'key_{i}', str(i * 2)) """ self._bulk_commit = True self._can_commit = False self._connection = await aiosqlite.connect(self.filename) try: yield await self._connection.commit() finally: self._bulk_commit = False self._can_commit = True await self._connection.close() self._connection = None
[docs] async def clear(self): async with self.get_connection(autocommit=True) as db: await db.execute(f'DROP TABLE `{self.table_name}`') await db.execute(f'CREATE TABLE `{self.table_name}` (key PRIMARY KEY, value)') await db.execute('VACUUM')
[docs] async def contains(self, key: str) -> bool: async with self.get_connection() as db: cur = await db.execute(f'SELECT COUNT(*) FROM `{self.table_name}` WHERE key=?', (key,)) row = await cur.fetchone() return bool(row[0]) if row else False
[docs] async def delete(self, key: str): async with self.get_connection(autocommit=True) as db: await db.execute(f'DELETE FROM `{self.table_name}` WHERE key=?', (key,))
[docs] async def keys(self) -> Iterable[str]: async with self.get_connection() as db: cur = await db.execute(f'SELECT key FROM `{self.table_name}`') return [row[0] for row in await cur.fetchall()]
[docs] async def read(self, key: str) -> ResponseOrKey: async with self.get_connection() as db: cur = await db.execute(f'SELECT value FROM `{self.table_name}` WHERE key=?', (key,)) row = await cur.fetchone() return row[0] if row else None
[docs] async def size(self) -> int: async with self.get_connection() as db: cur = await db.execute(f'SELECT COUNT(key) FROM `{self.table_name}`') row = await cur.fetchone() return row[0] if row else 0
[docs] async def values(self) -> Iterable[ResponseOrKey]: async with self.get_connection() as db: cur = await db.execute(f'SELECT value FROM `{self.table_name}`') return [row[0] for row in await cur.fetchall()]
[docs] async def write(self, key: str, item: Union[ResponseOrKey, sqlite3.Binary]): async with self.get_connection(autocommit=True) as db: await db.execute( f'INSERT OR REPLACE INTO `{self.table_name}` (key,value) VALUES (?,?)', (key, item), )
[docs]class SQLitePickleCache(SQLiteCache): """ Same as :py:class:`SqliteCache`, but pickles values before saving """
[docs] async def read(self, key: str) -> ResponseOrKey: item = await super().read(key) return pickle.loads(bytes(item)) if item else None # type: ignore
[docs] async def values(self) -> Iterable[ResponseOrKey]: async with self.get_connection() as db: cur = await db.execute(f'select value from `{self.table_name}`') return [self.unpickle(row[0]) for row in await cur.fetchall()]
[docs] async def write(self, key, item): binary_item = sqlite3.Binary(pickle.dumps(item, protocol=-1)) await super().write(key, binary_item)