import aiohttp as ah import time import asyncio import datetime import traceback from .utils import Event,Listener tracing=False def trace(*msg): if tracing: print(datetime.datetime.now(),*msg) class MatrixClient(): def __init__(self,homeserver,token): self.event_queue=asyncio.Queue() #Contains Events. Everything in here must be an Event. self.homeserver=homeserver self.token=token # Deal with login stuff later self.baseurl=f'https://{self.homeserver}/_matrix/client' self.session=ah.ClientSession() self.since=None self.listeners=[] def __del__(self): asyncio.get_event_loop().create_task(self.session.close()) # lol while not self.session.closed: time.sleep(0.01) async def request(self,endpoint='sync',method='GET', ver=0,params={} *args,**kwargs): async with self.session.request(method, f'{self.baseurl}/r{ver}/{endpoint}', params=params|{'access_token':self.token}|({'since':self.since} if self.since else {}), *args,**kwargs) as fetched: if fetched.status_code!=200: raise Exception('fix ur shit') try: return await fetched.json() except JSONDecodeError: pass # TODO: Figure out what this is called # This function just dumps /sync blobs in the event queue as a raw event. # ALL handling is deferred to handlers. # ... Except updating the since token. That's important to do here to guarantee that it never calls with the same token twice. async def sync(self): blob=await self.request(params={'timeout':30000}) self.since=blob['next_batch'] self.event_queue.put(Event('m.sync',None,blob)) def addlistener(self,name=None,match=None): func=type(lambda:1) # Gross, but I can't actually find it. # And yes, lamdas are too. if isinstance(name,func) and match==None: match=name; name=None def __wrap__(funky): self.listeners.append(Listener(name,match,funky)) return funky async def process_queue(self): while True: item=await self.event_queue.get(): for listener in self.listeners: if listener==item.event_type: listener(item) #traceback.format_exc()