Add support for non-message room events
This commit is contained in:
@@ -1,9 +1,12 @@
|
||||
from datetime import datetime
|
||||
from typing import Dict, Optional
|
||||
from enum import auto
|
||||
from typing import Dict, Optional, Sequence, Type, Union
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
from .event import Event
|
||||
import nio
|
||||
|
||||
from .event import AutoStrEnum, Event
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -38,3 +41,52 @@ class RoomMemberUpdated(Event):
|
||||
class RoomMemberDeleted(Event):
|
||||
room_id: str = field()
|
||||
user_id: str = field()
|
||||
|
||||
|
||||
# Timeline
|
||||
|
||||
class ContentType(AutoStrEnum):
|
||||
html = auto()
|
||||
image = auto()
|
||||
audio = auto()
|
||||
video = auto()
|
||||
file = auto()
|
||||
location = auto()
|
||||
|
||||
|
||||
@dataclass
|
||||
class TimelineEventReceived(Event):
|
||||
event_type: Type[nio.Event] = field()
|
||||
room_id: str = field()
|
||||
event_id: str = field()
|
||||
sender_id: str = field()
|
||||
date: datetime = field()
|
||||
content: str = field()
|
||||
|
||||
content_type: ContentType = ContentType.html
|
||||
is_local_echo: bool = False
|
||||
|
||||
show_name_line: bool = False
|
||||
translatable: Union[bool, Sequence[str]] = True
|
||||
|
||||
target_user_id: Optional[str] = None
|
||||
|
||||
|
||||
@classmethod
|
||||
def from_nio(cls, room: nio.rooms.MatrixRoom, ev: nio.Event, **fields
|
||||
) -> "TimelineEventReceived":
|
||||
return cls(
|
||||
event_type = type(ev),
|
||||
room_id = room.room_id,
|
||||
event_id = ev.event_id,
|
||||
sender_id = ev.sender,
|
||||
date = datetime.fromtimestamp(ev.server_timestamp / 1000),
|
||||
target_user_id = getattr(ev, "state_key", None),
|
||||
**fields
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class TimelineMessageReceived(TimelineEventReceived):
|
||||
show_name_line: bool = True
|
||||
translatable: Union[bool, Sequence[str]] = False
|
||||
|
@@ -1,32 +0,0 @@
|
||||
from datetime import datetime
|
||||
from enum import auto
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
from .event import AutoStrEnum, Event
|
||||
|
||||
|
||||
class EventType(AutoStrEnum):
|
||||
text = auto()
|
||||
html = auto()
|
||||
file = auto()
|
||||
image = auto()
|
||||
audio = auto()
|
||||
video = auto()
|
||||
location = auto()
|
||||
notice = auto()
|
||||
|
||||
|
||||
@dataclass
|
||||
class TimelineEvent(Event):
|
||||
type: EventType = field()
|
||||
room_id: str = field()
|
||||
event_id: str = field()
|
||||
sender_id: str = field()
|
||||
date: datetime = field()
|
||||
is_local_echo: bool = field()
|
||||
|
||||
|
||||
@dataclass
|
||||
class HtmlMessageReceived(TimelineEvent):
|
||||
content: str = field()
|
@@ -1,9 +1,10 @@
|
||||
import asyncio
|
||||
import html
|
||||
import inspect
|
||||
import json
|
||||
import logging as log
|
||||
import platform
|
||||
from contextlib import suppress
|
||||
from datetime import datetime
|
||||
from types import ModuleType
|
||||
from typing import Dict, Optional, Type
|
||||
|
||||
@@ -12,7 +13,7 @@ from nio.rooms import MatrixRoom
|
||||
|
||||
from . import __about__
|
||||
from .events import rooms, users
|
||||
from .events.rooms_timeline import EventType, HtmlMessageReceived
|
||||
from .events.rooms import TimelineEventReceived, TimelineMessageReceived
|
||||
from .html_filter import HTML_FILTER
|
||||
|
||||
|
||||
@@ -162,17 +163,157 @@ class MatrixClient(nio.AsyncClient):
|
||||
|
||||
# Callbacks for nio events
|
||||
|
||||
async def onRoomMessageText(self, room: MatrixRoom, ev: nio.RoomMessageText
|
||||
) -> None:
|
||||
is_html = ev.format == "org.matrix.custom.html"
|
||||
filter_ = HTML_FILTER.filter
|
||||
# Special %tokens for event contents:
|
||||
# %S = sender's displayname
|
||||
# %T = target (ev.state_key)'s displayname
|
||||
|
||||
HtmlMessageReceived(
|
||||
type = EventType.html if is_html else EventType.text,
|
||||
room_id = room.room_id,
|
||||
event_id = ev.event_id,
|
||||
sender_id = ev.sender,
|
||||
date = datetime.fromtimestamp(ev.server_timestamp / 1000),
|
||||
is_local_echo = False,
|
||||
content = filter_(ev.formatted_body) if is_html else ev.body,
|
||||
async def onRoomMessageText(self, room, ev) -> None:
|
||||
co = HTML_FILTER.filter(
|
||||
ev.formatted_body
|
||||
if ev.format == "org.matrix.custom.html" else html.escape(ev.body)
|
||||
)
|
||||
TimelineMessageReceived.from_nio(room, ev, content=co)
|
||||
|
||||
|
||||
async def onRoomCreateEvent(self, room, ev) -> None:
|
||||
co = "%S allowed users on other matrix servers to join this room." \
|
||||
if ev.federate else \
|
||||
"%S blocked users on other matrix servers from joining this room."
|
||||
TimelineEventReceived.from_nio(room, ev, content=co)
|
||||
|
||||
|
||||
async def onRoomGuestAccessEvent(self, room, ev) -> None:
|
||||
allowed = "allowed" if ev.guest_access else "forbad"
|
||||
co = f"%S {allowed} guests to join the room."
|
||||
TimelineEventReceived.from_nio(room, ev, content=co)
|
||||
|
||||
|
||||
async def onRoomJoinRulesEvent(self, room, ev) -> None:
|
||||
access = "public" if ev.join_rule == "public" else "invite-only"
|
||||
co = f"%S made the room {access}."
|
||||
TimelineEventReceived.from_nio(room, ev, content=co)
|
||||
|
||||
|
||||
async def onRoomHistoryVisibilityEvent(self, room, ev) -> None:
|
||||
if ev.history_visibility == "shared":
|
||||
to = "all room members"
|
||||
elif ev.history_visibility == "world_readable":
|
||||
to = "any member or outsider"
|
||||
elif ev.history_visibility == "joined":
|
||||
to = "all room members, since the time they joined"
|
||||
elif ev.history_visibility == "invited":
|
||||
to = "all room members, since the time they were invited"
|
||||
else:
|
||||
to = "???"
|
||||
log.warning("Invalid visibility - %s",
|
||||
json.dumps(ev.__dict__, indent=4))
|
||||
|
||||
co = f"%S made future room history visible to {to}."
|
||||
TimelineEventReceived.from_nio(room, ev, content=co)
|
||||
|
||||
|
||||
async def onPowerLevelsEvent(self, room, ev) -> None:
|
||||
co = "%S changed the room's permissions." # TODO: improve
|
||||
TimelineEventReceived.from_nio(room, ev, content=co)
|
||||
|
||||
|
||||
async def _get_room_member_event_content(self, ev) -> str:
|
||||
prev = ev.prev_content
|
||||
prev_membership = prev["membership"] if prev else None
|
||||
now = ev.content
|
||||
membership = now["membership"]
|
||||
|
||||
if not prev or membership != prev_membership:
|
||||
reason = f" Reason: {now['reason']}" if now.get("reason") else ""
|
||||
|
||||
if membership == "join":
|
||||
did = "accepted" if prev and prev_membership == "invite" else \
|
||||
"declined"
|
||||
return f"%S {did} their invitation."
|
||||
|
||||
if membership == "invite":
|
||||
return f"%S invited %T to the room."
|
||||
|
||||
if membership == "leave":
|
||||
if ev.state_key == ev.sender:
|
||||
return (
|
||||
f"%S declined their invitation.{reason}"
|
||||
if prev and prev_membership == "invite" else
|
||||
f"%S left the room.{reason}"
|
||||
)
|
||||
|
||||
return (
|
||||
f"%S withdrew %T's invitation.{reason}"
|
||||
if prev and prev_membership == "invite" else
|
||||
|
||||
f"%S unbanned %T from the room.{reason}"
|
||||
if prev and prev_membership == "ban" else
|
||||
|
||||
f"%S kicked out %T from the room.{reason}"
|
||||
)
|
||||
|
||||
if membership == "ban":
|
||||
return f"%S banned %T from the room.{reason}"
|
||||
|
||||
changed = []
|
||||
|
||||
if prev and now["avatar_url"] != prev["avatar_url"]:
|
||||
changed.append("profile picture") # TODO: <img>s
|
||||
|
||||
|
||||
if prev and now["displayname"] != prev["displayname"]:
|
||||
changed.append('display name from "{}" to "{}"'.format(
|
||||
prev["displayname"] or ev.state_key,
|
||||
now["displayname"] or ev.state_key,
|
||||
))
|
||||
|
||||
if changed:
|
||||
return "%S changed their {}.".format(" and ".join(changed))
|
||||
|
||||
log.warning("Invalid member event - %s",
|
||||
json.dumps(ev.__dict__, indent=4))
|
||||
return "%S ???"
|
||||
|
||||
|
||||
async def onRoomMemberEvent(self, room, ev) -> None:
|
||||
co = await self._get_room_member_event_content(ev)
|
||||
TimelineEventReceived.from_nio(room, ev, content=co)
|
||||
|
||||
|
||||
async def onRoomAliasEvent(self, room, ev) -> None:
|
||||
co = f"%S set the room's main address to {ev.canonical_alias}."
|
||||
TimelineEventReceived.from_nio(room, ev, content=co)
|
||||
|
||||
|
||||
async def onRoomNameEvent(self, room, ev) -> None:
|
||||
co = f"%S changed the room's name to \"{ev.name}\"."
|
||||
TimelineEventReceived.from_nio(room, ev, content=co)
|
||||
|
||||
|
||||
async def onRoomTopicEvent(self, room, ev) -> None:
|
||||
co = f"%S changed the room's topic to \"{ev.topic}\"."
|
||||
TimelineEventReceived.from_nio(room, ev, content=co)
|
||||
|
||||
|
||||
async def onRoomEncryptionEvent(self, room, ev) -> None:
|
||||
co = f"%S turned on encryption for this room."
|
||||
TimelineEventReceived.from_nio(room, ev, content=co)
|
||||
|
||||
|
||||
async def onOlmEvent(self, room, ev) -> None:
|
||||
co = f"%S hasn't sent your device the keys to decrypt this message."
|
||||
TimelineEventReceived.from_nio(room, ev, content=co)
|
||||
|
||||
|
||||
async def onMegolmEvent(self, room, ev) -> None:
|
||||
await self.onOlmEvent(room, ev)
|
||||
|
||||
|
||||
async def onBadEvent(self, room, ev) -> None:
|
||||
co = f"%S sent a malformed event."
|
||||
TimelineEventReceived.from_nio(room, ev, content=co)
|
||||
|
||||
|
||||
async def onUnknownBadEvent(self, room, ev) -> None:
|
||||
co = f"%S sent an event this client doesn't understand."
|
||||
TimelineEventReceived.from_nio(room, ev, content=co)
|
||||
|
Reference in New Issue
Block a user