Source code for jupyterlite_pyodide_lock.lockers.handlers.cacher
"""A ``tornado`` handler for proxying remote CDN files with a cache."""
# Copyright (c) jupyterlite-pyodide-lock contributors.
# Distributed under the terms of the BSD-3-Clause License.
from __future__ import annotations
import asyncio
import re
from collections.abc import Callable
from pathlib import Path
from typing import TYPE_CHECKING, Any
from tornado.httpclient import AsyncHTTPClient, HTTPError
from tornado.simple_httpclient import HTTPTimeoutError
from .mime import ExtraMimeFiles
TReplacer = bytes | Callable[[bytes], bytes]
TRouteRewrite = tuple[str, TReplacer]
TRewriteMap = dict[str, list[TRouteRewrite]]
[docs]
class CachingRemoteFiles(ExtraMimeFiles):
"""a handler which serves files from a cache, downloading them as needed."""
#: remote URL root
remote: str
#: HTTP client
client: AsyncHTTPClient
#: URL patterns that should have text replaced
rewrites: TRewriteMap
[docs]
def initialize(self, *args: Any, **kwargs: Any) -> None:
"""Extend the base initialize with instance members."""
remote: str = kwargs.pop("remote")
rewrites: TRewriteMap | None = kwargs.pop("rewrites", None)
super().initialize(*args, **kwargs)
self.remote = remote
self.client = AsyncHTTPClient()
self.rewrites = rewrites or {}
[docs]
async def get(self, path: str, include_body: bool = True) -> None: # noqa: FBT002, FBT001
"""Actually fetch a file."""
cache_path = Path(self.root) / path
if cache_path.exists(): # pragma: no cover
cache_path.touch()
else:
await self.cache_file(path, cache_path)
return await super().get(path, include_body=include_body)
[docs]
async def cache_file(self, path: str, cache_path: Path) -> None:
"""Get the file, and rewrite it."""
if not cache_path.parent.exists(): # pragma: no cover
cache_path.parent.mkdir(parents=True)
url = f"{self.remote}/{path}"
body = await self.fetch_body_with_retries(url)
for url_pattern, replacements in self.rewrites.items():
if re.search(url_pattern, path) is None: # pragma: no cover
self.log.debug("[cacher] %s is not %s", url, url_pattern)
continue
for marker, replacement in replacements:
if marker not in body: # pragma: no cover
self.log.debug("[cacher] %s does not contain %s", url, marker)
continue
self.log.debug("[cacher] %s contains %s", url, marker)
if isinstance(replacement, bytes):
body = body.replace(marker, replacement)
elif callable(replacement):
body = replacement(body)
else: # pragma: no cover
msg = f"Don't know what to do with {type(replacement)}"
raise NotImplementedError(msg)
await asyncio.get_running_loop().run_in_executor(
None, cache_path.write_bytes, body
)
[docs]
async def fetch_body_with_retries(self, fetch_url: str, retries: int = 5) -> bytes:
"""Fetch the raw bytes of URL with retries.
In the event of a timeout, wait for an increasing number of seconds
"""
self.log.debug("[cacher] fetching: %s", fetch_url)
last_error: HTTPError | None = None
for attempt in range(retries):
await asyncio.sleep(2**attempt)
try:
res = await self.client.fetch(fetch_url)
except HTTPTimeoutError as err: # pragma: no cover
last_error = err
continue
else:
return res.body
if TYPE_CHECKING:
assert last_error
raise last_error # pragma: no cover