import aiohttp as ah import time import asyncio import datetime import traceback from .utils import Event,Listener func=type(lambda:1) # Gross, but I can't actually find it. # And yes, lamdas are too. tracing=False def trace(*msg): if tracing: print(datetime.datetime.now(),*msg) class MatrixClient(): def __init__(self,homeserver,token): self.event_queue=asyncio.Queue() #Contains Events. Everything in here must be an Event. self.listeners=[] self.timers=[] async def request(self,endpoint='sync',method='GET', ver=0,headers={} *args,**kwargs): async with self.session.request(method, f'{self.baseurl}/r{ver}/{endpoint}', headers=headers|{'Authorization':f'Bearer {self.token}'}, *args,**kwargs) as fetched: if fetched.status_code!=200: raise Exception('fix ur shit') try: return await fetched.json() except JSONDecodeError: pass # TODO: Figure out what this is called # This function just dumps /sync blobs in the event queue as a raw event. # ALL handling is deferred to handlers. # ... Except updating the since token. That's important to do here to guarantee that it never calls with the same token twice. async def sync(self): blob=await self.request(params={'timeout':30000,'since':self.since}) self.since=blob['next_batch'] self.event_queue.put(Event('m.sync',None,blob)) def addlistener(self,name=None,match=None): if isinstance(name,func) and match==None: match=name; name=None def __wrap__(funky): self.listeners.append(Listener(name,match,funky)) return funky def loop(timer=0): def _loop(funky): @functools.wraps(funky) async def _wrapper(*args,**kwargs): while True: try: await funky(*args,**kwargs) except Exception: open('error '+str(datetime.datetime.now()),'w').write(traceback.format_exc()) await asyncio.sleep(timer) self.timers.append(_wrapper) return _wrapper return _loop async def process_queue(self): item=await self.event_queue.get(): for listener in self.listeners: if listener==item: await listener(item)