moment/src/python/image_provider.py

148 lines
4.2 KiB
Python

# Copyright 2019 miruka
# This file is part of harmonyqml, licensed under LGPLv3.
import asyncio
import random
import re
from io import BytesIO
from pathlib import Path
from typing import Tuple
from urllib.parse import urlparse
import aiofiles
from dataclasses import dataclass, field
from PIL import Image as PILImage
import nio
import pyotherside
from nio.api import ResizingMethod
Size = Tuple[int, int]
ImageData = Tuple[bytearray, Size, int] # last int: pyotherside format enum
CONCURRENT_DOWNLOADS_LIMIT = asyncio.BoundedSemaphore(8)
@dataclass
class Thumbnail:
# pylint: disable=no-member
provider: "ImageProvider" = field()
mxc: str = field()
width: int = field()
height: int = field()
def __post_init__(self) -> None:
self.mxc = re.sub(r"#auto$", "", self.mxc)
if not re.match(r"^mxc://.+/.+", self.mxc):
raise ValueError(f"Invalid mxc URI: {self.mxc}")
@property
def server_size(self) -> Tuple[int, int]:
# https://matrix.org/docs/spec/client_server/latest#thumbnails
if self.width > 640 or self.height > 480:
return (800, 600)
if self.width > 320 or self.height > 240:
return (640, 480)
if self.width > 96 or self.height > 96:
return (320, 240)
if self.width > 32 or self.height > 32:
return (96, 96)
return (32, 32)
@property
def resize_method(self) -> ResizingMethod:
return ResizingMethod.scale \
if self.width > 96 or self.height > 96 else ResizingMethod.crop
@property
def http(self) -> str:
return nio.Api.mxc_to_http(self.mxc)
@property
def local_path(self) -> Path:
# pylint: disable=bad-string-format-type
parsed = urlparse(self.mxc)
name = "%s.%03d.%03d.%s" % (
parsed.path.lstrip("/"),
self.server_size[0],
self.server_size[1],
self.resize_method.value,
)
return self.provider.cache / parsed.netloc / name
async def download(self) -> bytes:
client = random.choice(
tuple(self.provider.app.backend.clients.values())
)
parsed = urlparse(self.mxc)
async with CONCURRENT_DOWNLOADS_LIMIT:
response = await client.thumbnail(
server_name = parsed.netloc,
media_id = parsed.path.lstrip("/"),
width = self.server_size[0],
height = self.server_size[1],
method = self.resize_method,
)
if isinstance(response, nio.ThumbnailError):
# Return a transparent 1x1 PNG
with BytesIO() as img_out:
PILImage.new("RGBA", (1, 1), (0, 0, 0, 0)).save(img_out, "PNG")
return img_out.getvalue()
body = response.body
if response.content_type not in ("image/jpeg", "image/png"):
with BytesIO(body) as img_in, BytesIO() as img_out:
PILImage.open(img_in).save(img_out, "PNG")
body = img_out.getvalue()
self.local_path.parent.mkdir(parents=True, exist_ok=True)
async with aiofiles.open(self.local_path, "wb") as file:
await file.write(body)
return body
async def get_data(self) -> ImageData:
try:
body = self.local_path.read_bytes()
except FileNotFoundError:
body = await self.download()
with BytesIO(body) as img_in:
real_size = PILImage.open(img_in).size
return (bytearray(body), real_size, pyotherside.format_data)
class ImageProvider:
def __init__(self, app) -> None:
self.app = app
self.cache = Path(self.app.appdirs.user_cache_dir) / "thumbnails"
self.cache.mkdir(parents=True, exist_ok=True)
def get(self, image_id: str, requested_size: Size) -> ImageData:
if requested_size[0] < 1 or requested_size[1] < 1:
raise ValueError(f"width or height < 1: {requested_size!r}")
return asyncio.run_coroutine_threadsafe(
Thumbnail(self, image_id, *requested_size).get_data(),
self.app.loop
).result()