Generate thumbnail when uploading images

This commit is contained in:
miruka 2019-10-29 16:42:56 -04:00
parent 83dc09a684
commit 412a86762e
3 changed files with 70 additions and 13 deletions

View File

@ -1,6 +1,7 @@
import asyncio import asyncio
import html import html
import inspect import inspect
import io
import json import json
import logging as log import logging as log
import platform import platform
@ -38,6 +39,14 @@ class UploadForbidden(UploadError):
class UploadTooLarge(UploadError): class UploadTooLarge(UploadError):
http_code: Optional[int] = 413 http_code: Optional[int] = 413
@dataclass
class UneededThumbnail(Exception):
pass
@dataclass
class UnthumbnailableError(Exception):
exception: Optional[Exception] = None
class MatrixClient(nio.AsyncClient): class MatrixClient(nio.AsyncClient):
def __init__(self, def __init__(self,
@ -223,6 +232,14 @@ class MatrixClient(nio.AsyncClient):
content["info"]["w"], content["info"]["h"] = \ content["info"]["w"], content["info"]["h"] = \
PILImage.open(path).size PILImage.open(path).size
try:
thumb_url, thumb_info = await self.upload_thumbnail(path)
except (UneededThumbnail, UnthumbnailableError):
pass
else:
content["info"]["thumbnail_url"] = thumb_url
content["info"]["thumbnail_info"] = thumb_info
elif kind == "audio": elif kind == "audio":
event_type = nio.RoomMessageAudio event_type = nio.RoomMessageAudio
content["msgtype"] = "m.audio" content["msgtype"] = "m.audio"
@ -373,26 +390,66 @@ class MatrixClient(nio.AsyncClient):
self.models.pop((Member, room_id), None) self.models.pop((Member, room_id), None)
async def upload_file(self, path: Union[Path, str], async def upload_thumbnail(self, path: Union[Path, str],
) -> Tuple[str, Optional[str]]: ) -> Tuple[str, Dict[str, Union[str, int]]]:
path = Path(path) try:
thumb = PILImage.open(path)
small = thumb.width <= 512 and thumb.height <= 512
is_jpg_png = thumb.format in ("JPEG", "PNG")
opaque_png = thumb.format == "PNG" and thumb.mode != "RGBA"
if small and is_jpg_png and not opaque_png:
raise UneededThumbnail()
thumb.thumbnail((512, 512))
with io.BytesIO() as out:
if thumb.mode == "RGBA":
thumb.save(out, "PNG")
mime = "image/png"
else:
thumb.convert("RGB").save(out, "JPEG")
mime = "image/jpeg"
content = out.getvalue()
return (
await self.upload(content, mime, Path(path).name),
{
"w": thumb.width,
"h": thumb.height,
"mimetype": mime,
"size": len(content),
},
)
except OSError as err:
raise UnthumbnailableError(err)
async def upload_file(self, path: Union[Path, str]) -> Tuple[str, str]:
with open(path, "rb") as file: with open(path, "rb") as file:
mime = utils.guess_mime(file) mime = utils.guess_mime(file)
file.seek(0, 0) file.seek(0, 0)
resp = await self.upload(file, mime, path.name) return (await self.upload(file, mime, Path(path).name), mime)
if not isinstance(resp, nio.ErrorResponse):
return (resp.content_uri, mime)
if resp.status_code == 403: async def upload(self, data, mime: str, filename: Optional[str] = None,
) -> str:
response = await super().upload(data, mime, filename)
if not isinstance(response, nio.ErrorResponse):
return response.content_uri
if response.status_code == 403:
raise UploadForbidden() raise UploadForbidden()
if resp.status_code == 413: if response.status_code == 413:
raise UploadTooLarge() raise UploadTooLarge()
raise UploadError(resp.status_code) raise UploadError(response.status_code)
async def set_avatar_from_file(self, path: Union[Path, str]) -> None: async def set_avatar_from_file(self, path: Union[Path, str]) -> None:

View File

@ -34,12 +34,12 @@ def is_svg(file: IO) -> bool:
return False return False
def guess_mime(file: IO) -> Optional[str]: def guess_mime(file: IO) -> str:
if is_svg(file): if is_svg(file):
return "image/svg+xml" return "image/svg+xml"
file.seek(0, 0) file.seek(0, 0)
return filetype.guess_mime(file) return filetype.guess_mime(file) or "application/octet-stream"
def plain2html(text: str) -> str: def plain2html(text: str) -> str: