114 lines
3.5 KiB
Python
114 lines
3.5 KiB
Python
"""Install `uvloop` if possible and provide a `QmlBridge`."""
|
|
|
|
import asyncio
|
|
import logging as log
|
|
import signal
|
|
import traceback
|
|
from concurrent.futures import Future
|
|
from operator import attrgetter
|
|
from threading import Thread
|
|
from typing import Coroutine, Sequence
|
|
|
|
from .backend import Backend
|
|
from .pyotherside_events import CoroutineDone
|
|
|
|
try:
|
|
import uvloop
|
|
except ModuleNotFoundError:
|
|
log.warning("uvloop module not found, using slower default asyncio loop")
|
|
else:
|
|
uvloop.install()
|
|
|
|
|
|
class QmlBridge:
|
|
"""Setup asyncio and provide synchronous methods to call coroutines.
|
|
|
|
A thread is created to run the asyncio loop in, to ensure all calls from
|
|
QML return instantly.
|
|
Methods are provided for QML to call coroutines using PyOtherSide, which
|
|
doesn't have this ability out of the box.
|
|
|
|
Attributes:
|
|
backend: The `Backend` containing the coroutines of interest and
|
|
`MatrixClient` objects.
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
self.backend: Backend = Backend()
|
|
|
|
self._loop = asyncio.get_event_loop()
|
|
Thread(target=self._start_asyncio_loop).start()
|
|
|
|
|
|
def _start_asyncio_loop(self) -> None:
|
|
asyncio.set_event_loop(self._loop)
|
|
self._loop.run_forever()
|
|
|
|
|
|
def _call_coro(self, coro: Coroutine, uuid: str) -> Future:
|
|
"""Schedule a coroutine to run in our thread and return a `Future`."""
|
|
|
|
def on_done(future: Future) -> None:
|
|
"""Send a PyOtherSide event with the coro's result/exception."""
|
|
result = exception = trace = None
|
|
|
|
try:
|
|
result = future.result()
|
|
except Exception as err:
|
|
exception = err
|
|
trace = traceback.format_exc().rstrip()
|
|
|
|
CoroutineDone(uuid, result, exception, trace)
|
|
|
|
future = asyncio.run_coroutine_threadsafe(coro, self._loop)
|
|
future.add_done_callback(on_done)
|
|
return future
|
|
|
|
|
|
def call_backend_coro(
|
|
self, name: str, uuid: str, args: Sequence[str] = (),
|
|
) -> Future:
|
|
"""Schedule a `Backend` coroutine and return a `Future`."""
|
|
|
|
return self._call_coro(attrgetter(name)(self.backend)(*args), uuid)
|
|
|
|
|
|
def call_client_coro(
|
|
self, user_id: str, name: str, uuid: str, args: Sequence[str] = (),
|
|
) -> Future:
|
|
"""Schedule a `MatrixClient` coroutine and return a `Future`."""
|
|
|
|
client = self.backend.clients[user_id]
|
|
return self._call_coro(attrgetter(name)(client)(*args), uuid)
|
|
|
|
|
|
def pdb(self, additional_data: Sequence = ()) -> None:
|
|
"""Call the RemotePdb debugger; define some conveniance variables."""
|
|
|
|
ad = additional_data # noqa
|
|
ba = self.backend # noqa
|
|
mo = self.backend.models # noqa
|
|
cl = self.backend.clients
|
|
gcl = lambda user: cl[f"@{user}:matrix.org"] # noqa
|
|
|
|
rc = lambda c: asyncio.run_coroutine_threadsafe(c, self._loop) # noqa
|
|
|
|
from .models.items import Account, Room, Member, Event, Device # noqa
|
|
|
|
p = print # pdb's `p` doesn't print a class's __str__ # noqa
|
|
try:
|
|
from pprintpp import pprint as pp # noqa
|
|
except ModuleNotFoundError:
|
|
pass
|
|
|
|
log.info("\n=> Run `socat readline tcp:127.0.0.1:4444` in a terminal "
|
|
"to connect to pdb.")
|
|
import remote_pdb
|
|
remote_pdb.RemotePdb("127.0.0.1", 4444).set_trace()
|
|
|
|
|
|
# Make CTRL-C work again
|
|
signal.signal(signal.SIGINT, signal.SIG_DFL)
|
|
|
|
BRIDGE = QmlBridge()
|