moment/src/backend/media_cache.py

359 lines
10 KiB
Python
Raw Normal View History

2019-12-19 22:46:16 +11:00
# SPDX-License-Identifier: LGPL-3.0-or-later
"""Matrix media downloading, caching and retrieval."""
2019-12-19 04:43:31 +11:00
import asyncio
import functools
import io
import re
import shutil
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import TYPE_CHECKING, Any, DefaultDict, Dict, Optional
from urllib.parse import urlparse
from PIL import Image as PILImage
2020-03-13 19:35:51 +11:00
import nio
from .utils import Size, atomic_write
if TYPE_CHECKING:
from .backend import Backend
if sys.version_info < (3, 8):
import pyfastcopy # noqa
CryptDict = Optional[Dict[str, Any]]
CONCURRENT_DOWNLOADS_LIMIT = asyncio.BoundedSemaphore(8)
ACCESS_LOCKS: DefaultDict[str, asyncio.Lock] = DefaultDict(asyncio.Lock)
2019-12-19 04:43:31 +11:00
@dataclass
class MediaCache:
"""Matrix downloaded media cache."""
2019-12-19 04:43:31 +11:00
backend: "Backend" = field()
base_dir: Path = field()
2019-12-19 04:43:31 +11:00
def __post_init__(self) -> None:
self.thumbs_dir = self.base_dir / "thumbnails"
self.downloads_dir = self.base_dir / "downloads"
self.thumbs_dir.mkdir(parents=True, exist_ok=True)
self.downloads_dir.mkdir(parents=True, exist_ok=True)
async def get_media(
self,
client_user_id: str,
mxc: str,
title: str,
crypt_dict: CryptDict = None,
) -> Path:
"""Return `Media.get()`'s result. Intended for QML."""
2019-12-19 04:43:31 +11:00
return await Media(self, client_user_id, mxc, title, crypt_dict).get()
2019-12-19 04:43:31 +11:00
async def get_thumbnail(
self,
client_user_id: str,
mxc: str,
title: str,
width: int,
height: int,
crypt_dict: CryptDict = None,
2019-12-19 04:43:31 +11:00
) -> Path:
"""Return `Thumbnail.get()`'s result. Intended for QML."""
2019-12-19 04:43:31 +11:00
# QML sometimes pass float sizes, which matrix API doesn't like.
size = (round(width), round(height))
thumb = Thumbnail(self, client_user_id, mxc, title, crypt_dict, size)
2019-12-19 04:43:31 +11:00
return await thumb.get()
@dataclass
class Media:
2019-12-19 04:43:31 +11:00
"""A matrix media file."""
cache: "MediaCache" = field()
client_user_id: str = field()
mxc: str = field()
title: str = field()
crypt_dict: CryptDict = field(repr=False)
def __post_init__(self) -> None:
self.mxc = re.sub(r"#auto$", "", self.mxc)
if not re.match(r"^mxc://.+/.+", self.mxc):
raise ValueError(f"Invalid mxc URI: {self.mxc}")
@property
def local_path(self) -> Path:
2020-03-13 05:41:00 +11:00
"""The path where the file either exists or should be downloaded.
The returned paths are in this form:
```
<base download folder>/<homeserver domain>/
<file title>_<mxc id>.<file extension>`
```
e.g. `~/.cache/mirage/downloads/matrix.org/foo_Hm24ar11i768b0el.png`.
"""
2019-12-19 04:43:31 +11:00
parsed = urlparse(self.mxc)
mxc_id = parsed.path.lstrip("/")
title = Path(self.title)
filename = f"{title.stem}_{mxc_id}{title.suffix}"
return self.cache.downloads_dir / parsed.netloc / filename
async def get(self) -> Path:
2019-12-19 04:43:31 +11:00
"""Return the cached file's path, downloading it first if needed."""
async with ACCESS_LOCKS[self.mxc]:
try:
return await self.get_local()
except FileNotFoundError:
return await self.create()
async def get_local(self) -> Path:
"""Return a cached local existing path for this media or raise."""
2019-12-19 04:43:31 +11:00
if not self.local_path.exists():
raise FileNotFoundError()
return self.local_path
async def create(self) -> Path:
2019-12-19 04:43:31 +11:00
"""Download and cache the media file to disk."""
async with CONCURRENT_DOWNLOADS_LIMIT:
data = await self._get_remote_data()
self.local_path.parent.mkdir(parents=True, exist_ok=True)
async with atomic_write(self.local_path, binary=True) as (file, done):
await file.write(data)
done()
if type(self) is Media:
for event in self.cache.backend.mxc_events[self.mxc]:
event.media_local_path = self.local_path
return self.local_path
async def _get_remote_data(self) -> bytes:
2019-12-19 04:43:31 +11:00
"""Return the file's data from the matrix server, decrypt if needed."""
2019-11-05 01:29:18 +11:00
parsed = urlparse(self.mxc)
resp = await self.cache.backend.clients[self.client_user_id].download(
2019-11-05 01:29:18 +11:00
server_name = parsed.netloc,
media_id = parsed.path.lstrip("/"),
)
return await self._decrypt(resp.body)
async def _decrypt(self, data: bytes) -> bytes:
2019-12-19 04:43:31 +11:00
"""Decrypt an encrypted file's data."""
if not self.crypt_dict:
return data
func = functools.partial(
nio.crypto.attachments.decrypt_attachment,
data,
self.crypt_dict["key"]["k"],
self.crypt_dict["hashes"]["sha256"],
self.crypt_dict["iv"],
)
# Run in a separate thread
return await asyncio.get_event_loop().run_in_executor(None, func)
@classmethod
async def from_existing_file(
cls,
cache: "MediaCache",
client_user_id: str,
mxc: str,
existing: Path,
overwrite: bool = False,
**kwargs,
) -> "Media":
2019-12-19 04:43:31 +11:00
"""Copy an existing file to cache and return a `Media` for it."""
media = cls(
cache, client_user_id, mxc, existing.name, {}, **kwargs,
) # type: ignore
media.local_path.parent.mkdir(parents=True, exist_ok=True)
if not media.local_path.exists() or overwrite:
func = functools.partial(shutil.copy, existing, media.local_path)
await asyncio.get_event_loop().run_in_executor(None, func)
return media
@classmethod
async def from_bytes(
cls,
cache: "MediaCache",
client_user_id: str,
mxc: str,
filename: str,
data: bytes,
overwrite: bool = False,
**kwargs,
) -> "Media":
2019-12-19 04:43:31 +11:00
"""Create a cached file from bytes data and return a `Media` for it."""
media = cls(
cache, client_user_id, mxc, filename, {}, **kwargs,
) # type: ignore
media.local_path.parent.mkdir(parents=True, exist_ok=True)
if not media.local_path.exists() or overwrite:
path = media.local_path
async with atomic_write(path, binary=True) as (file, done):
await file.write(data)
done()
return media
@dataclass
class Thumbnail(Media):
2019-12-19 04:43:31 +11:00
"""The thumbnail of a matrix media, which is a media itself."""
cache: "MediaCache" = field()
client_user_id: str = field()
mxc: str = field()
title: str = field()
crypt_dict: CryptDict = field(repr=False)
wanted_size: Size = field()
server_size: Optional[Size] = field(init=False, repr=False, default=None)
@staticmethod
def normalize_size(size: Size) -> Size:
2019-12-19 04:43:31 +11:00
"""Return standard `(width, height)` matrix thumbnail dimensions.
The Matrix specification defines a few standard thumbnail dimensions
for homeservers to store and return: 32x32, 96x96, 320x240, 640x480,
and 800x600.
This method returns the best matching size for a `size` without
upscaling, e.g. passing `(641, 480)` will return `(800, 600)`.
"""
if size[0] > 640 or size[1] > 480:
return (800, 600)
if size[0] > 320 or size[1] > 240:
return (640, 480)
if size[0] > 96 or size[1] > 96:
return (320, 240)
if size[0] > 32 or size[1] > 32:
return (96, 96)
return (32, 32)
@property
def local_path(self) -> Path:
2019-12-19 04:43:31 +11:00
"""The path where the thumbnail either exists or should be downloaded.
The returned paths are in this form:
2020-03-13 05:41:00 +11:00
```
<base thumbnail folder>/<homeserver domain>/<standard size>/
<file title>_<mxc id>.<file extension>`
```
e.g.
`~/.cache/mirage/thumbnails/matrix.org/32x32/foo_Hm24ar11i768b0el.png`.
2019-12-19 04:43:31 +11:00
"""
size = self.normalize_size(self.server_size or self.wanted_size)
size_dir = f"{size[0]}x{size[1]}"
parsed = urlparse(self.mxc)
mxc_id = parsed.path.lstrip("/")
title = Path(self.title)
filename = f"{title.stem}_{mxc_id}{title.suffix}"
return self.cache.thumbs_dir / parsed.netloc / size_dir / filename
async def get_local(self) -> Path:
2020-03-13 05:41:00 +11:00
"""Return an existing thumbnail path or raise `FileNotFoundError`.
If we have a bigger size thumbnail downloaded than the `wanted_size`
for the media, return it instead of asking the server for a
smaller thumbnail.
"""
if self.local_path.exists():
return self.local_path
try_sizes = ((32, 32), (96, 96), (320, 240), (640, 480), (800, 600))
parts = list(self.local_path.parts)
size = self.normalize_size(self.server_size or self.wanted_size)
for width, height in try_sizes:
if width < size[0] or height < size[1]:
continue
parts[-2] = f"{width}x{height}"
path = Path("/".join(parts))
if path.exists():
return path
raise FileNotFoundError()
async def _get_remote_data(self) -> bytes:
2020-03-13 05:41:00 +11:00
"""Return the (decrypted) media file's content from the server."""
parsed = urlparse(self.mxc)
client = self.cache.backend.clients[self.client_user_id]
if self.crypt_dict:
# Matrix makes encrypted thumbs only available through the download
# end-point, not the thumbnail one
resp = await client.download(
server_name = parsed.netloc,
media_id = parsed.path.lstrip("/"),
)
else:
resp = await client.thumbnail(
server_name = parsed.netloc,
media_id = parsed.path.lstrip("/"),
width = self.wanted_size[0],
height = self.wanted_size[1],
)
decrypted = await self._decrypt(resp.body)
with io.BytesIO(decrypted) as img:
# The server may return a thumbnail bigger than what we asked for
self.server_size = PILImage.open(img).size
return decrypted