import asyncio import json import logging as log from contextlib import suppress from dataclasses import dataclass, field from datetime import datetime from typing import Optional, Tuple import nio from . import utils from .html_filter import HTML_FILTER from .matrix_client import MatrixClient from .models.items import Account, Room, TypeSpecifier @dataclass class NioCallbacks: client: MatrixClient = field() def __post_init__(self) -> None: c = self.client for name, class_ in utils.classes_defined_in(nio.responses).items(): with suppress(AttributeError): c.add_response_callback(getattr(self, f"on{name}"), class_) for name, class_ in utils.classes_defined_in(nio.events).items(): with suppress(AttributeError): c.add_event_callback(getattr(self, f"on{name}"), class_) c.add_ephemeral_callback( self.onTypingNoticeEvent, nio.events.TypingNoticeEvent, ) async def onSyncResponse(self, resp: nio.SyncResponse) -> None: c = self.client for room_id, info in resp.rooms.join.items(): if room_id not in c.past_tokens: c.past_tokens[room_id] = info.timeline.prev_batch # TODO: way of knowing if a nio.MatrixRoom is left for room_id, info in resp.rooms.leave.items(): # TODO: handle in nio, these are rooms that were left before # starting the client. if room_id not in c.all_rooms: log.warning("Left room not in MatrixClient.rooms: %r", room_id) continue # TODO: handle left events in nio async client for ev in info.timeline.events: if isinstance(ev, nio.RoomMemberEvent): await self.onRoomMemberEvent(c.all_rooms[room_id], ev) await c.register_nio_room(c.all_rooms[room_id], left=True) if not c.first_sync_done.is_set(): asyncio.ensure_future(c.load_rooms_without_visible_events()) c.first_sync_done.set() c.first_sync_date = datetime.now() c.models[Account][c.user_id].first_sync_done = True async def onErrorResponse(self, resp: nio.ErrorResponse) -> None: # TODO: show something in the client, must be seen on login screen too try: log.warning("%s - %s", resp, json.dumps(vars(resp), indent=4)) except Exception: log.warning(repr(resp)) # Callbacks for nio room events # Content: %1 is the sender, %2 the target (ev.state_key). async def onRoomMessageText(self, room, ev) -> None: co = HTML_FILTER.filter( ev.formatted_body if ev.format == "org.matrix.custom.html" else utils.plain2html(ev.body), ) await self.client.register_nio_event(room, ev, content=co) async def onRoomMessageEmote(self, room, ev) -> None: co = HTML_FILTER.filter_inline( ev.formatted_body if ev.format == "org.matrix.custom.html" else utils.plain2html(ev.body), ) await self.client.register_nio_event(room, ev, content=co) async def onRoomMessageUnknown(self, room, ev) -> None: co = "%1 sent a message this client doesn't understand." await self.client.register_nio_event(room, ev, content=co) async def onRoomMessageMedia(self, room, ev) -> None: info = ev.source["content"].get("info", {}) media_crypt_dict = ev.source["content"].get("file", {}) thumb_info = info.get("thumbnail_info", {}) thumb_crypt_dict = info.get("thumbnail_file", {}) await self.client.register_nio_event( room, ev, content = "", inline_content = ev.body, media_url = ev.url, media_title = ev.body, media_width = info.get("w") or 0, media_height = info.get("h") or 0, media_duration = info.get("duration") or 0, media_size = info.get("size") or 0, media_mime = info.get("mimetype") or 0, media_crypt_dict = media_crypt_dict, thumbnail_url = info.get("thumbnail_url") or thumb_crypt_dict.get("url") or "", thumbnail_width = thumb_info.get("w") or 0, thumbnail_height = thumb_info.get("h") or 0, thumbnail_crypt_dict = thumb_crypt_dict, ) async def onRoomEncryptedMedia(self, room, ev) -> None: await self.onRoomMessageMedia(room, ev) async def onRoomCreateEvent(self, room, ev) -> None: co = "%1 allowed users on other matrix servers to join this room." \ if ev.federate else \ "%1 blocked users on other matrix servers from joining this room." await self.client.register_nio_event(room, ev, content=co) async def onRoomGuestAccessEvent(self, room, ev) -> None: allowed = "allowed" if ev.guest_access else "forbad" co = f"%1 {allowed} guests to join the room." await self.client.register_nio_event(room, ev, content=co) async def onRoomJoinRulesEvent(self, room, ev) -> None: access = "public" if ev.join_rule == "public" else "invite-only" co = f"%1 made the room {access}." await self.client.register_nio_event(room, ev, content=co) async def onRoomHistoryVisibilityEvent(self, room, ev) -> None: if ev.history_visibility == "shared": to = "all room members" elif ev.history_visibility == "world_readable": to = "any member or outsider" elif ev.history_visibility == "joined": to = "all room members, since the time they joined" elif ev.history_visibility == "invited": to = "all room members, since the time they were invited" else: to = "???" log.warning("Invalid visibility - %s", json.dumps(vars(ev), indent=4)) co = f"%1 made future room history visible to {to}." await self.client.register_nio_event(room, ev, content=co) async def onPowerLevelsEvent(self, room, ev) -> None: co = "%1 changed the room's permissions." # TODO: improve await self.client.register_nio_event(room, ev, content=co) async def process_room_member_event( self, room, ev, ) -> Optional[Tuple[TypeSpecifier, str]]: if ev.prev_content == ev.content: return None prev = ev.prev_content now = ev.content membership = ev.membership prev_membership = ev.prev_membership ev_date = datetime.fromtimestamp(ev.server_timestamp / 1000) member_change = TypeSpecifier.membership_change # Membership changes if not prev or membership != prev_membership: reason = f" Reason: {now['reason']}" if now.get("reason") else "" if membership == "join": return ( member_change, "%1 accepted their invitation." if prev and prev_membership == "invite" else "%1 joined the room.", ) if membership == "invite": return (member_change, "%1 invited %2 to the room.") if membership == "leave": if ev.state_key == ev.sender: return ( member_change, f"%1 declined their invitation.{reason}" if prev and prev_membership == "invite" else f"%1 left the room.{reason}", ) return ( member_change, f"%1 withdrew %2's invitation.{reason}" if prev and prev_membership == "invite" else f"%1 unbanned %2 from the room.{reason}" if prev and prev_membership == "ban" else f"%1 kicked out %2 from the room.{reason}", ) if membership == "ban": return (member_change, f"%1 banned %2 from the room.{reason}") # Profile changes changed = [] if prev and now["avatar_url"] != prev["avatar_url"]: changed.append("profile picture") # TODO: s if prev and now["displayname"] != prev["displayname"]: changed.append('display name from "{}" to "{}"'.format( prev["displayname"] or ev.state_key, now["displayname"] or ev.state_key, )) if changed: # Update our account profile if the event is newer than last update if ev.state_key == self.client.user_id: account = self.client.models[Account][self.client.user_id] updated = account.profile_updated if not updated or updated < ev_date: account.profile_updated = ev_date account.display_name = now["displayname"] or "" account.avatar_url = now["avatar_url"] or "" # Hide profile events from the timeline - XXX self.client.skipped_events[room.room_id] += 1 return None return ( TypeSpecifier.profile_change, "%1 changed their {}.".format(" and ".join(changed)), ) log.warning("Unknown member event: %s", json.dumps(vars(ev), indent=4)) return None async def onRoomMemberEvent(self, room, ev) -> None: type_and_content = await self.process_room_member_event(room, ev) if type_and_content is None: # This is run from register_nio_event otherwise await self.client.register_nio_room(room) else: type_specifier, content = type_and_content await self.client.register_nio_event( room, ev, content=content, type_specifier=type_specifier, ) async def onRoomAliasEvent(self, room, ev) -> None: co = f"%1 set the room's main address to {ev.canonical_alias}." await self.client.register_nio_event(room, ev, content=co) async def onRoomNameEvent(self, room, ev) -> None: co = f"%1 changed the room's name to \"{ev.name}\"." await self.client.register_nio_event(room, ev, content=co) async def onRoomTopicEvent(self, room, ev) -> None: topic = HTML_FILTER.filter_inline(ev.topic) co = f"%1 changed the room's topic to \"{topic}\"." await self.client.register_nio_event(room, ev, content=co) async def onRoomEncryptionEvent(self, room, ev) -> None: co = "%1 turned on encryption for this room." await self.client.register_nio_event(room, ev, content=co) async def onMegolmEvent(self, room, ev) -> None: co = "%1 sent an undecryptable message." await self.client.register_nio_event(room, ev, content=co) async def onBadEvent(self, room, ev) -> None: co = "%1 sent a malformed event." await self.client.register_nio_event(room, ev, content=co) async def onUnknownBadEvent(self, room, ev) -> None: co = "%1 sent an event this client doesn't understand." await self.client.register_nio_event(room, ev, content=co) # Callbacks for nio invite events async def onInviteEvent(self, room, ev) -> None: await self.client.register_nio_room(room) # Callbacks for nio ephemeral events async def onTypingNoticeEvent(self, room, ev) -> None: # Prevent recent past typing notices from being shown for a split # second on client startup: if not self.client.first_sync_done.is_set(): return if room.room_id not in self.client.models[Room, self.client.user_id]: return room_item = self.client.models[Room, self.client.user_id][room.room_id] room_item.typing_members = sorted( room.user_name(user_id) for user_id in ev.users if user_id not in self.client.backend.clients )