From 412a86762e23ba3a572458e947ea39e2a38e0a88 Mon Sep 17 00:00:00 2001 From: miruka Date: Tue, 29 Oct 2019 16:42:56 -0400 Subject: [PATCH] Generate thumbnail when uploading images --- src/python/matrix_client.py | 75 ++++++++++++++++++++++++++++---- src/python/pyotherside_events.py | 4 +- src/python/utils.py | 4 +- 3 files changed, 70 insertions(+), 13 deletions(-) diff --git a/src/python/matrix_client.py b/src/python/matrix_client.py index d555670f..343de3d3 100644 --- a/src/python/matrix_client.py +++ b/src/python/matrix_client.py @@ -1,6 +1,7 @@ import asyncio import html import inspect +import io import json import logging as log import platform @@ -38,6 +39,14 @@ class UploadForbidden(UploadError): class UploadTooLarge(UploadError): http_code: Optional[int] = 413 +@dataclass +class UneededThumbnail(Exception): + pass + +@dataclass +class UnthumbnailableError(Exception): + exception: Optional[Exception] = None + class MatrixClient(nio.AsyncClient): def __init__(self, @@ -223,6 +232,14 @@ class MatrixClient(nio.AsyncClient): content["info"]["w"], content["info"]["h"] = \ PILImage.open(path).size + try: + thumb_url, thumb_info = await self.upload_thumbnail(path) + except (UneededThumbnail, UnthumbnailableError): + pass + else: + content["info"]["thumbnail_url"] = thumb_url + content["info"]["thumbnail_info"] = thumb_info + elif kind == "audio": event_type = nio.RoomMessageAudio content["msgtype"] = "m.audio" @@ -373,26 +390,66 @@ class MatrixClient(nio.AsyncClient): self.models.pop((Member, room_id), None) - async def upload_file(self, path: Union[Path, str], - ) -> Tuple[str, Optional[str]]: - path = Path(path) + async def upload_thumbnail(self, path: Union[Path, str], + ) -> Tuple[str, Dict[str, Union[str, int]]]: + try: + thumb = PILImage.open(path) + small = thumb.width <= 512 and thumb.height <= 512 + is_jpg_png = thumb.format in ("JPEG", "PNG") + opaque_png = thumb.format == "PNG" and thumb.mode != "RGBA" + + if small and is_jpg_png and not opaque_png: + raise UneededThumbnail() + + thumb.thumbnail((512, 512)) + + with io.BytesIO() as out: + if thumb.mode == "RGBA": + thumb.save(out, "PNG") + mime = "image/png" + else: + thumb.convert("RGB").save(out, "JPEG") + mime = "image/jpeg" + + content = out.getvalue() + + return ( + await self.upload(content, mime, Path(path).name), + { + "w": thumb.width, + "h": thumb.height, + "mimetype": mime, + "size": len(content), + }, + ) + + except OSError as err: + raise UnthumbnailableError(err) + + + async def upload_file(self, path: Union[Path, str]) -> Tuple[str, str]: with open(path, "rb") as file: mime = utils.guess_mime(file) file.seek(0, 0) - resp = await self.upload(file, mime, path.name) + return (await self.upload(file, mime, Path(path).name), mime) - if not isinstance(resp, nio.ErrorResponse): - return (resp.content_uri, mime) - if resp.status_code == 403: + async def upload(self, data, mime: str, filename: Optional[str] = None, + ) -> str: + response = await super().upload(data, mime, filename) + + if not isinstance(response, nio.ErrorResponse): + return response.content_uri + + if response.status_code == 403: raise UploadForbidden() - if resp.status_code == 413: + if response.status_code == 413: raise UploadTooLarge() - raise UploadError(resp.status_code) + raise UploadError(response.status_code) async def set_avatar_from_file(self, path: Union[Path, str]) -> None: diff --git a/src/python/pyotherside_events.py b/src/python/pyotherside_events.py index 86c2d649..de36aec4 100644 --- a/src/python/pyotherside_events.py +++ b/src/python/pyotherside_events.py @@ -47,8 +47,8 @@ class AlertRequested(PyOtherSideEvent): class CoroutineDone(PyOtherSideEvent): """Indicate that an asyncio coroutine finished.""" - uuid: str = field() - result: Any = None + uuid: str = field() + result: Any = None exception: Optional[Exception] = None traceback: Optional[str] = None diff --git a/src/python/utils.py b/src/python/utils.py index 43df4549..4cbfb3c5 100644 --- a/src/python/utils.py +++ b/src/python/utils.py @@ -34,12 +34,12 @@ def is_svg(file: IO) -> bool: return False -def guess_mime(file: IO) -> Optional[str]: +def guess_mime(file: IO) -> str: if is_svg(file): return "image/svg+xml" file.seek(0, 0) - return filetype.guess_mime(file) + return filetype.guess_mime(file) or "application/octet-stream" def plain2html(text: str) -> str: