Safer linkifying for user display names

This commit is contained in:
miruka 2020-03-22 20:58:05 -04:00
parent 8fd7ce4e14
commit 8d5bc45ceb
3 changed files with 87 additions and 61 deletions

View File

@ -3,8 +3,7 @@
"""HTML and Markdown processing tools.""" """HTML and Markdown processing tools."""
import re import re
from typing import Dict, Optional from typing import DefaultDict, Dict
from urllib.parse import quote
import html_sanitizer.sanitizer as sanitizer import html_sanitizer.sanitizer as sanitizer
import mistune import mistune
@ -111,7 +110,18 @@ class HTMLProcessor:
"table", "thead", "tbody", "tr", "th", "td", "pre", "table", "thead", "tbody", "tr", "th", "td", "pre",
} }
link_regexes = [re.compile(r, re.IGNORECASE) for r in [ user_id_regex = re.compile(
r"(?=^|\W)(?P<body>@.+?:(?P<host>[a-zA-Z\d.-:]*[a-zA-Z\d]))",
)
room_id_regex = re.compile(
r"(?=^|\W)(?P<body>!.+?:(?P<host>[a-zA-Z\d.-:]*[a-zA-Z\d]))",
)
room_alias_regex = re.compile(
r"(?=^|\W)(?P<body>#.+?:(?P<host>[a-zA-Z\d.-:]*[a-zA-Z\d]))",
)
link_regexes = [re.compile(r, re.IGNORECASE)
if isinstance(r, str) else r for r in [
# Normal :// URLs # Normal :// URLs
(r"(?P<body>[a-zA-Z\d]+://(?P<host>[a-z\d._-]+(?:\:\d+)?)" (r"(?P<body>[a-zA-Z\d]+://(?P<host>[a-z\d._-]+(?:\:\d+)?)"
r"(?:/[/\-_.,a-z\d#%&?;=~]*)?(?:\([/\-_.,a-z\d#%&?;=~]*\))?)"), r"(?:/[/\-_.,a-z\d#%&?;=~]*)?(?:\([/\-_.,a-z\d#%&?;=~]*\))?)"),
@ -123,10 +133,7 @@ class HTMLProcessor:
# magnet: # magnet:
r"(?P<body>magnet:\?xt=urn:[a-z0-9]+:.+)(?P<host>)", r"(?P<body>magnet:\?xt=urn:[a-z0-9]+:.+)(?P<host>)",
# User ID, room ID, room alias user_id_regex, room_id_regex, room_alias_regex,
r"(?=^|\W)(?P<body>@.+?:(?P<host>[a-zA-Z\d.-:]*[a-zA-Z\d]))",
r"(?=^|\W)(?P<body>#.+?:(?P<host>[a-zA-Z\d.-:]*[a-zA-Z\d]))",
r"(?=^|\W)(?P<body>!.+?:(?P<host>[a-zA-Z\d.-:]*[a-zA-Z\d]))",
]] ]]
inline_quote_regex = re.compile(r"(^|⏎)(\s*&gt;[^⏎\n]*)", re.MULTILINE) inline_quote_regex = re.compile(r"(^|⏎)(\s*&gt;[^⏎\n]*)", re.MULTILINE)
@ -140,19 +147,11 @@ class HTMLProcessor:
extra_newlines_regex = re.compile(r"\n(\n*)") extra_newlines_regex = re.compile(r"\n(\n*)")
# {room_id: {user_id: username}}
rooms_user_id_names: DefaultDict[str, Dict[str, str]] = DefaultDict(dict)
def __init__(self) -> None: def __init__(self) -> None:
self._sanitizers = {
(False, False): Sanitizer(self.sanitize_settings(False, False)),
(True, False): Sanitizer(self.sanitize_settings(True, False)),
(False, True): Sanitizer(self.sanitize_settings(False, True)),
(True, True): Sanitizer(self.sanitize_settings(True, True)),
}
self._inline_sanitizer = Sanitizer(self.sanitize_settings(inline=True))
self._inline_outgoing_sanitizer = \
Sanitizer(self.sanitize_settings(inline=True))
# The whitespace remover doesn't take <pre> into account # The whitespace remover doesn't take <pre> into account
sanitizer.normalize_overall_whitespace = lambda html, *args, **kw: html sanitizer.normalize_overall_whitespace = lambda html, *args, **kw: html
sanitizer.normalize_whitespace_in_text_or_tail = \ sanitizer.normalize_whitespace_in_text_or_tail = \
@ -178,36 +177,29 @@ class HTMLProcessor:
text: str, text: str,
inline: bool = False, inline: bool = False,
outgoing: bool = False, outgoing: bool = False,
mentionable_users: Optional[Dict[str, str]] = None, # {id: name} room_id: str = "",
) -> str: ) -> str:
"""Return filtered HTML from Markdown text.""" """Return filtered HTML from Markdown text."""
text = self.markdown_linkify_users_rooms(text, mentionable_users) return self.filter(
return self.filter(self._markdown_to_html(text), inline, outgoing) self._markdown_to_html(text),
inline,
outgoing,
def markdown_linkify_users_rooms( room_id,
self, text: str, usernames: Optional[Dict[str, str]] = None,
) -> str:
"""Turn usernames, user ID, room alias, room ID into matrix.to URL."""
for user_id, username in (usernames or {}).items():
text = re.sub(
rf"(?<!\w)({re.escape(username)})(?!\w)",
rf"[\1](https://matrix.to/#/{quote(user_id)})",
text,
flags=re.IGNORECASE,
) )
return text
def filter( def filter(
self, html: str, inline: bool = False, outgoing: bool = False, self,
html: str,
inline: bool = False,
outgoing: bool = False,
room_id: str = "",
) -> str: ) -> str:
"""Filter and return HTML.""" """Filter and return HTML."""
html = self._sanitizers[inline, outgoing].sanitize(html).rstrip("\n") settings = self.sanitize_settings(inline, outgoing, room_id)
html = Sanitizer(settings).sanitize(html).rstrip("\n")
if outgoing: if outgoing:
return html return html
@ -226,7 +218,7 @@ class HTMLProcessor:
def sanitize_settings( def sanitize_settings(
self, inline: bool = False, outgoing: bool = False, self, inline: bool = False, outgoing: bool = False, room_id: str = "",
) -> dict: ) -> dict:
"""Return an html_sanitizer configuration.""" """Return an html_sanitizer configuration."""
@ -247,6 +239,11 @@ class HTMLProcessor:
"span": {"data-mx-color"}, "span": {"data-mx-color"},
}} }}
username_link_regexes = [re.compile(r, re.IGNORECASE) for r in [
rf"(?<!\w)(?P<body>{re.escape(username)})(?!\w)(?P<host>)"
for username in self.rooms_user_id_names[room_id].values()
]]
return { return {
"tags": inline_tags if inline else all_tags, "tags": inline_tags if inline else all_tags,
"attributes": inlines_attributes if inline else attributes, "attributes": inlines_attributes if inline else attributes,
@ -258,7 +255,8 @@ class HTMLProcessor:
"keep_typographic_whitespace": True, "keep_typographic_whitespace": True,
"add_nofollow": False, "add_nofollow": False,
"autolink": { "autolink": {
"link_regexes": self.link_regexes, "link_regexes":
self.link_regexes + username_link_regexes, # type: ignore
"avoid_hosts": [], "avoid_hosts": [],
}, },
"sanitize_href": lambda href: href, "sanitize_href": lambda href: href,
@ -280,7 +278,7 @@ class HTMLProcessor:
self._remove_extra_newlines, self._remove_extra_newlines,
self._newlines_to_return_symbol if inline else lambda el: el, self._newlines_to_return_symbol if inline else lambda el: el,
self._matrix_toify_user_room_links, lambda el: self._matrix_toify(el, room_id),
], ],
"element_postprocessors": [ "element_postprocessors": [
self._font_color_to_span if outgoing else lambda el: el, self._font_color_to_span if outgoing else lambda el: el,
@ -374,12 +372,31 @@ class HTMLProcessor:
return el return el
@staticmethod def _matrix_toify(self, el: HtmlElement, room_id: str = "") -> HtmlElement:
def _matrix_toify_user_room_links(el: HtmlElement) -> HtmlElement: """Turn userID, usernames, roomID, room aliases into matrix.to URL."""
if el.tag != "a" or not el.attrib.get("href"): if el.tag != "a" or not el.attrib.get("href"):
# print("ret 1", el.tag, el.attrib, el.text, el.tail, sep="||")
return el return el
el.attrib["href"] = "https://matrix.to/#/%s" % el.attrib["href"] id_regexes = (
self.user_id_regex, self.room_id_regex, self.room_alias_regex,
)
for regex in id_regexes:
if regex.match(el.attrib["href"]):
el.attrib["href"] = f"https://matrix.to/#/{el.attrib['href']}"
if room_id not in self.rooms_user_id_names:
# print("ret 2", el.tag, el.attrib, el.text, el.tail, sep="||")
return el
for user_id, username in self.rooms_user_id_names[room_id].items():
# print(el.attrib["href"], username, user_id)
if el.attrib["href"] == username:
el.attrib["href"] = f"https://matrix.to/#/{user_id}"
# print("ret 3", el.tag, el.attrib, el.text, el.tail, sep="||")
return el return el

View File

@ -308,14 +308,7 @@ class MatrixClient(nio.AsyncClient):
async def send_text(self, room_id: str, text: str) -> None: async def send_text(self, room_id: str, text: str) -> None:
"""Send a markdown `m.text` or `m.notice` (with `/me`) message .""" """Send a markdown `m.text` or `m.notice` (with `/me`) message ."""
from_md = partial( from_md = partial(HTML.from_markdown, room_id=room_id)
HTML.from_markdown,
mentionable_users={
user_id: member.display_name or user_id
for user_id, member in
self.models[self.user_id, room_id, "members"].items()
},
)
escape = False escape = False
if text.startswith("//") or text.startswith(r"\/"): if text.startswith("//") or text.startswith(r"\/"):
@ -626,7 +619,9 @@ class MatrixClient(nio.AsyncClient):
content = event_fields.get("content", "").strip() content = event_fields.get("content", "").strip()
if content and "inline_content" not in event_fields: if content and "inline_content" not in event_fields:
event_fields["inline_content"] = HTML.filter(content, inline=True) event_fields["inline_content"] = HTML.filter(
content, inline=True, room_id=room_id,
)
event = Event( event = Event(
id = f"echo-{transaction_id}", id = f"echo-{transaction_id}",
@ -1088,7 +1083,9 @@ class MatrixClient(nio.AsyncClient):
display_name = room.display_name or "", display_name = room.display_name or "",
avatar_url = room.gen_avatar_url or "", avatar_url = room.gen_avatar_url or "",
plain_topic = room.topic or "", plain_topic = room.topic or "",
topic = HTML.filter(room.topic or "", inline=True), topic = HTML.filter(
room.topic or "", inline=True, room_id=room.room_id,
),
inviter_id = inviter, inviter_id = inviter,
inviter_name = room.user_name(inviter) if inviter else "", inviter_name = room.user_name(inviter) if inviter else "",
inviter_avatar = inviter_avatar =
@ -1123,6 +1120,7 @@ class MatrixClient(nio.AsyncClient):
for user_id in left_the_room: for user_id in left_the_room:
del self.models[self.user_id, room.room_id, "members"][user_id] del self.models[self.user_id, room.room_id, "members"][user_id]
HTML.rooms_user_id_names[room.room_id].pop(user_id, None)
# Add the room members to the added room # Add the room members to the added room
new_dict = { new_dict = {
@ -1138,6 +1136,11 @@ class MatrixClient(nio.AsyncClient):
} }
self.models[self.user_id, room.room_id, "members"].update(new_dict) self.models[self.user_id, room.room_id, "members"].update(new_dict)
for user_id, member in room.users.items():
if member.display_name:
HTML.rooms_user_id_names[room.room_id][user_id] = \
member.display_name
async def get_member_name_avatar( async def get_member_name_avatar(
self, room_id: str, user_id: str, self, room_id: str, user_id: str,
@ -1182,7 +1185,9 @@ class MatrixClient(nio.AsyncClient):
content = fields.get("content", "").strip() content = fields.get("content", "").strip()
if content and "inline_content" not in fields: if content and "inline_content" not in fields:
fields["inline_content"] = HTML.filter(content, inline=True) fields["inline_content"] = HTML.filter(
content, inline=True, room_id=room.room_id,
)
# Create Event ModelItem # Create Event ModelItem

View File

@ -97,6 +97,8 @@ class NioCallbacks:
ev.formatted_body ev.formatted_body
if ev.format == "org.matrix.custom.html" else if ev.format == "org.matrix.custom.html" else
utils.plain2html(ev.body), utils.plain2html(ev.body),
room_id = room.room_id,
) )
await self.client.register_nio_event(room, ev, content=co) await self.client.register_nio_event(room, ev, content=co)
@ -337,7 +339,9 @@ class NioCallbacks:
async def onRoomTopicEvent(self, room, ev) -> None: async def onRoomTopicEvent(self, room, ev) -> None:
if ev.topic: if ev.topic:
topic = HTML_PROCESSOR.filter(ev.topic, inline=True) topic = HTML_PROCESSOR.filter(
ev.topic, inline=True, room_id=room.room_id,
)
co = f"%1 changed the room's topic to \"{topic}\"" co = f"%1 changed the room's topic to \"{topic}\""
else: else:
co = "%1 removed the room's topic" co = "%1 removed the room's topic"