184 lines
5.4 KiB
184 lines
5.4 KiB
import asyncio
import logging as log
import random
import re
from dataclasses import dataclass, field
from io import BytesIO
from pathlib import Path
from typing import Optional, Tuple
from urllib.parse import urlparse
import aiofiles
from PIL import Image as PILImage
import nio
import pyotherside
from nio.api import ResizingMethod
from . import utils
POSFormat = int
Size = Tuple[int, int]
ImageData = Tuple[bytearray, Size, int] # last int: pyotherside format enum
CONCURRENT_DOWNLOADS_LIMIT = asyncio.BoundedSemaphore(8)
with BytesIO() as img_out:
PILImage.new("RGBA", (1, 1), (0, 0, 0, 0)).save(img_out, "PNG")
TRANSPARENT_1X1_PNG = (img_out.getvalue(), pyotherside.format_data)
class Thumbnail:
provider: "ImageProvider" = field()
mxc: str = field()
width: int = field()
height: int = field()
def __post_init__(self) -> None:
self.mxc = re.sub(r"#auto$", "", self.mxc)
if not re.match(r"^mxc://.+/.+", self.mxc):
raise ValueError(f"Invalid mxc URI: {self.mxc}")
def server_size(self) -> Tuple[int, int]:
# https://matrix.org/docs/spec/client_server/latest#thumbnails
if self.width > 640 or self.height > 480:
return (800, 600)
if self.width > 320 or self.height > 240:
return (640, 480)
if self.width > 96 or self.height > 96:
return (320, 240)
if self.width > 32 or self.height > 32:
return (96, 96)
return (32, 32)
def resize_method(self) -> ResizingMethod:
return ResizingMethod.scale \
if self.width > 96 or self.height > 96 else ResizingMethod.crop
def http(self) -> str:
return nio.Api.mxc_to_http(self.mxc)
def local_path(self) -> Path:
parsed = urlparse(self.mxc)
name = "%s.%03d.%03d.%s" % (
return self.provider.cache / parsed.netloc / name
async def read_data(self, data: bytes, mime: Optional[str],
) -> Tuple[bytes, POSFormat]:
if mime == "image/svg+xml":
return (data, pyotherside.format_svg_data)
if mime in ("image/jpeg", "image/png"):
return (data, pyotherside.format_data)
with BytesIO(data) as img_in:
image = PILImage.open(img_in)
if image.mode == "RGB":
return (data, pyotherside.format_rgb888)
if image.mode == "RGBA":
return (data, pyotherside.format_argb32)
with BytesIO() as img_out:
image.save(img_out, "PNG")
return (img_out.getvalue(), pyotherside.format_data)
except OSError as err:
log.warning("Unable to process image: %s - %r", self.http, err)
async def download(self) -> Tuple[bytes, POSFormat]:
client = random.choice(
parsed = urlparse(self.mxc)
resp = await client.thumbnail(
server_name = parsed.netloc,
media_id = parsed.path.lstrip("/"),
width = self.server_size[0],
height = self.server_size[1],
method = self.resize_method,
if isinstance(resp, nio.ThumbnailError):
log.warning("Downloading thumbnail failed - %s", resp)
body, pos_format = await self.read_data(resp.body, resp.content_type)
self.local_path.parent.mkdir(parents=True, exist_ok=True)
async with aiofiles.open(self.local_path, "wb") as file:
# body might have been converted, always save the original image.
await file.write(resp.body)
return (body, pos_format)
async def local_read(self) -> Tuple[bytes, POSFormat]:
data = self.local_path.read_bytes()
with BytesIO(data) as data_io:
return await self.read_data(data, utils.guess_mime(data_io))
async def get_data(self) -> ImageData:
data, pos_format = await self.local_read()
except (OSError, IOError, FileNotFoundError):
data, pos_format = await self.download()
with BytesIO(data) as img_in:
real_size = PILImage.open(img_in).size
return (bytearray(data), real_size, pos_format)
class ImageProvider:
def __init__(self, app) -> None:
self.app = app
self.cache = Path(self.app.appdirs.user_cache_dir) / "thumbnails"
self.cache.mkdir(parents=True, exist_ok=True)
def get(self, image_id: str, requested_size: Size) -> ImageData:
if requested_size[0] < 1 or requested_size[1] < 1:
raise ValueError(f"width or height < 1: {requested_size!r}")
thumb = Thumbnail(self, image_id, *requested_size)
except ValueError as err:
data, pos_format = TRANSPARENT_1X1_PNG
return (bytearray(data), (1, 1), pos_format)
return asyncio.run_coroutine_threadsafe(
thumb.get_data(), self.app.loop,