Fix multiple accounts local echo issues

- Messages should now always be in the order they were sent in when
  sending from multiple accounts to a same slow room

- Fixed dead lock that occured when sending a message
  from account A, then from account B, then from account A, in a
  room slow enough so that all of them would be in local echo transition
  at the same time.
This commit is contained in:
miruka 2019-12-16 10:36:59 -04:00
parent 423c657fea
commit 5dbf06ba6c
3 changed files with 40 additions and 47 deletions

View File

@ -53,7 +53,6 @@
- two upload delegates height bug - two upload delegates height bug
- Messed up message delegates position - Messed up message delegates position
- Quickly posting with another account leads to sync stop
- Pausing uploads doesn't work well, servers end up dropping the connection - Pausing uploads doesn't work well, servers end up dropping the connection
- Pause upload, switch to other room, then come back, wrong state displayed - Pause upload, switch to other room, then come back, wrong state displayed

View File

@ -39,6 +39,9 @@ class Backend:
self.get_profile_locks: DefaultDict[str, asyncio.Lock] = \ self.get_profile_locks: DefaultDict[str, asyncio.Lock] = \
DefaultDict(asyncio.Lock) # {user_id: lock} DefaultDict(asyncio.Lock) # {user_id: lock}
self.send_locks: DefaultDict[str, asyncio.Lock] = \
DefaultDict(asyncio.Lock) # {room_id: lock}
from .media_cache import MediaCache from .media_cache import MediaCache
cache_dir = Path(self.app.appdirs.user_cache_dir) cache_dir = Path(self.app.appdirs.user_cache_dir)
self.media_cache = MediaCache(self, cache_dir) self.media_cache = MediaCache(self, cache_dir)

View File

@ -24,7 +24,8 @@ import nio
from nio.crypto import AsyncDataT as UploadData from nio.crypto import AsyncDataT as UploadData
from nio.crypto import async_generator_from_data from nio.crypto import async_generator_from_data
from . import __about__, utils from .__about__ import __pkg_name__, __pretty_name__
from . import utils
from .errors import ( from .errors import (
BadMimeType, InvalidUserId, InvalidUserInContext, MatrixError, BadMimeType, InvalidUserId, InvalidUserInContext, MatrixError,
UneededThumbnail, UserNotFound, UneededThumbnail, UserNotFound,
@ -92,17 +93,11 @@ class MatrixClient(nio.AsyncClient):
self.first_sync_done: asyncio.Event = asyncio.Event() self.first_sync_done: asyncio.Event = asyncio.Event()
self.first_sync_date: Optional[datetime] = None self.first_sync_date: Optional[datetime] = None
self.send_locks: DefaultDict[str, asyncio.Lock] = \
DefaultDict(asyncio.Lock) # {room_id: lock}
self.past_tokens: Dict[str, str] = {} # {room_id: token} self.past_tokens: Dict[str, str] = {} # {room_id: token}
self.fully_loaded_rooms: Set[str] = set() # {room_id} self.fully_loaded_rooms: Set[str] = set() # {room_id}
self.loaded_once_rooms: Set[str] = set() # {room_id} self.loaded_once_rooms: Set[str] = set() # {room_id}
self.cleared_events_rooms: Set[str] = set() # {room_id} self.cleared_events_rooms: Set[str] = set() # {room_id}
self.local_echoes_uuid: Set[str] = set()
self.resolved_echoes: Dict[str, str] = {} # {event_id: echo_uuid}
self.skipped_events: DefaultDict[str, int] = DefaultDict(lambda: 0) self.skipped_events: DefaultDict[str, int] = DefaultDict(lambda: 0)
from .nio_callbacks import NioCallbacks from .nio_callbacks import NioCallbacks
@ -119,7 +114,7 @@ class MatrixClient(nio.AsyncClient):
def default_device_name(self) -> str: def default_device_name(self) -> str:
os_ = f" on {platform.system()}".rstrip() os_ = f" on {platform.system()}".rstrip()
os_ = f"{os_} {platform.release()}".rstrip() if os_ != " on" else "" os_ = f"{os_} {platform.release()}".rstrip() if os_ != " on" else ""
return f"{__about__.__pretty_name__}{os_}" return f"{__pretty_name__}{os_}"
async def login(self, password: str, device_name: str = "") -> None: async def login(self, password: str, device_name: str = "") -> None:
@ -222,10 +217,14 @@ class MatrixClient(nio.AsyncClient):
content["format"] = "org.matrix.custom.html" content["format"] = "org.matrix.custom.html"
content["formatted_body"] = to_html content["formatted_body"] = to_html
uuid = str(uuid4()) # Can't use the standard Matrix transaction IDs; they're only visible
# to the sender so our other accounts wouldn't be able to replace
# local echoes by real messages.
tx_id = uuid4()
content[f"{__pkg_name__}.transaction_id"] = str(tx_id)
await self._local_echo(room_id, uuid, event_type, content=echo_body) await self._local_echo(room_id, tx_id, event_type, content=echo_body)
await self._send_message(room_id, uuid, content) await self._send_message(room_id, content)
async def send_file(self, room_id: str, path: Union[Path, str]) -> None: async def send_file(self, room_id: str, path: Union[Path, str]) -> None:
@ -243,8 +242,9 @@ class MatrixClient(nio.AsyncClient):
) -> None: ) -> None:
from .media_cache import Media, Thumbnail from .media_cache import Media, Thumbnail
path = Path(path) transaction_id = uuid4()
encrypt = room_id in self.encrypted_rooms path = Path(path)
encrypt = room_id in self.encrypted_rooms
try: try:
size = path.resolve().stat().st_size size = path.resolve().stat().st_size
@ -291,6 +291,8 @@ class MatrixClient(nio.AsyncClient):
thumb_info: Optional[MatrixImageInfo] = None thumb_info: Optional[MatrixImageInfo] = None
content: dict = { content: dict = {
f"{__pkg_name__}.transaction_id": str(transaction_id),
"body": path.name, "body": path.name,
"info": { "info": {
"mimetype": mime, "mimetype": mime,
@ -397,10 +399,10 @@ class MatrixClient(nio.AsyncClient):
del self.models[Upload, room_id][str(upload_item.uuid)] del self.models[Upload, room_id][str(upload_item.uuid)]
uuid = str(uuid4())
await self._local_echo( await self._local_echo(
room_id, uuid, event_type, room_id,
transaction_id,
event_type,
inline_content = path.name, inline_content = path.name,
media_url = url, media_url = url,
media_title = path.name, media_title = path.name,
@ -416,11 +418,11 @@ class MatrixClient(nio.AsyncClient):
content["info"].get("thumbnail_info", {}).get("h", 0), content["info"].get("thumbnail_info", {}).get("h", 0),
) )
await self._send_message(room_id, uuid, content) await self._send_message(room_id, content)
async def _local_echo( async def _local_echo(
self, room_id: str, uuid: str, self, room_id: str, transaction_id: UUID,
event_type: Type[nio.Event], **event_fields, event_type: Type[nio.Event], **event_fields,
) -> None: ) -> None:
@ -428,7 +430,7 @@ class MatrixClient(nio.AsyncClient):
event = Event( event = Event(
source = None, source = None,
client_id = f"echo-{uuid}", client_id = f"echo-{transaction_id}",
event_id = "", event_id = "",
date = datetime.now(), date = datetime.now(),
sender_id = self.user_id, sender_id = self.user_id,
@ -439,30 +441,29 @@ class MatrixClient(nio.AsyncClient):
**event_fields, **event_fields,
) )
self.local_echoes_uuid.add(uuid)
for user_id in self.models[Account]: for user_id in self.models[Account]:
if user_id in self.models[Member, self.user_id, room_id]: if user_id in self.models[Member, self.user_id, room_id]:
self.models[Event, user_id, room_id][f"echo-{uuid}"] = event key = f"echo-{transaction_id}"
self.models[Event, user_id, room_id].sync_now() self.models[Event, user_id, room_id][key] = event
if user_id == self.user_id:
self.models[Event, user_id, room_id].sync_now()
await self.set_room_last_event(room_id, event) await self.set_room_last_event(room_id, event)
async def _send_message(self, room_id: str, uuid: str, content: dict, async def _send_message(self, room_id: str, content: dict) -> None:
) -> None:
async with self.send_locks[room_id]: async with self.backend.send_locks[room_id]:
response = await self.room_send( response = await self.room_send(
room_id = room_id, room_id = room_id,
message_type = "m.room.message", message_type = "m.room.message",
content = content, content = content,
tx_id = uuid,
ignore_unverified_devices = True, ignore_unverified_devices = True,
) )
if isinstance(response, nio.RoomSendError): if isinstance(response, nio.RoomSendError):
raise MatrixError.from_nio(response) raise MatrixError.from_nio(response)
async def load_past_events(self, room_id: str) -> bool: async def load_past_events(self, room_id: str) -> bool:
@ -924,25 +925,15 @@ class MatrixClient(nio.AsyncClient):
) )
# Add the Event to model # Add the Event to model
if ev.transaction_id in self.local_echoes_uuid: tx_id = ev.source.get("content", {}).get(
self.resolved_echoes[ev.event_id] = ev.transaction_id f"{__pkg_name__}.transaction_id",
self.local_echoes_uuid.discard(ev.transaction_id) )
item.client_id = f"echo-{ev.transaction_id}" local_sender = ev.sender in self.backend.clients
elif ev.sender in self.backend.clients: if local_sender and tx_id:
client = self.backend.clients[ev.sender] item.client_id = f"echo-{tx_id}"
# Wait until our other account has no more pending local echoes, if not local_sender and not await self.event_is_past(ev):
# so that we can know if this event should replace an echo
# from that client by finding its ID in the resolved_echoes dict.
# Server only gives back the transaction ID to the original sender.
while client.local_echoes_uuid: # while there are pending echoes
await asyncio.sleep(0.1)
with suppress(KeyError):
item.client_id = f"echo-{client.resolved_echoes[ev.event_id]}"
elif not await self.event_is_past(ev):
AlertRequested() AlertRequested()
self.models[Event, self.user_id, room.room_id][item.client_id] = item self.models[Event, self.user_id, room.room_id][item.client_id] = item