121 lines
3.5 KiB
Python
121 lines
3.5 KiB
Python
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
from enum import Enum
|
|
import uuid
|
|
|
|
class AbstractUser():
|
|
pass # It fixes circular dep on user.invited_by
|
|
# And on User.register (InviteToken)
|
|
|
|
@dataclass
|
|
class Token():
|
|
value: uuid.UUID
|
|
owner: AbstractUser
|
|
def __init__(self,value,owner=None):
|
|
if owner is None:
|
|
owner=value
|
|
self.value=uuid.uuid4()
|
|
else:
|
|
self.value=uuid.UUID(value)
|
|
self.owner=owner
|
|
def revoke(self):
|
|
backend.delete_token(self)
|
|
@property
|
|
def serialise(self):
|
|
return ','.join(map(str,[self,self.owner]))
|
|
def __str__(self): return str(self.value)
|
|
|
|
@dataclass
|
|
class InviteToken(Token):
|
|
_uses: int=0
|
|
_max_uses: int=-1
|
|
expires: datetime|None=None
|
|
def __init__(self,value,owner,uses:int,maxuses:int,expiry:datetime|None):
|
|
uses=int(uses); maxuses=int(maxuses);
|
|
if expiry!='None': expiry=datetime(expiry)
|
|
else: expiry=None
|
|
self.uses,self.max_uses,self.expires=uses,maxuses,expiry
|
|
return super().__init__(value,owner)
|
|
@property
|
|
def uses(self): return self._uses
|
|
@uses.setter
|
|
def uses(self,val):
|
|
if -1<self.max_uses<=val: self.revoke()
|
|
self._uses=val
|
|
@property
|
|
def max_uses(self): return self._max_uses
|
|
@max_uses.setter
|
|
def max_uses(self,val):
|
|
if -1<val<=self.uses: self.revoke()
|
|
self._max_uses=val
|
|
@property
|
|
def serialise(self):
|
|
return super().serialise+','+','.join(map(str,[self.uses,self.max_uses,self.expires]))
|
|
|
|
@dataclass
|
|
class AccessToken(Token):
|
|
def __init__(self,*args,**kwargs):
|
|
return super().__init__(*args,**kwargs)
|
|
|
|
@dataclass
|
|
class User(AbstractUser):
|
|
username: str
|
|
password_hash: str
|
|
salt: str
|
|
_invited_by: AbstractUser|str # Root node will just reference itself
|
|
email: str=''
|
|
state: str='pending'
|
|
def _load_invite(self):
|
|
if self._invited_by==self.username: return self # Sanity-check to prevent infinite recursion.
|
|
return backend.load_user(self._invited_by)
|
|
@property
|
|
def invited_by(self):
|
|
if isinstance(self._invited_by,str):
|
|
self._invited_by=self._load_invite()
|
|
return self._invited_by
|
|
@property
|
|
def serialise(self):
|
|
upstream=self.invited_by
|
|
upstream=upstream.username if upstream else 'None'
|
|
return ','.join([self.username,self.password_hash,str(self.salt),upstream,self.email or '',self.state])
|
|
def create_inv_token(self,*args,**kwargs):
|
|
""" All arguments are passed directly to InviteToken's constructor, with self added at the beginning """
|
|
tok=InviteToken(self,*args,**kwargs)
|
|
backend.save_invite(tok)
|
|
return tok
|
|
def auth(self,pw):
|
|
return utils.phash(pw,self.salt)[0]==self.password_hash
|
|
def change_password(self,old_pw:str|None,new_pw:str):
|
|
if self.auth(old_pw):
|
|
self.password_hash,self.salt=utils.phash(new_pw)
|
|
self.save()
|
|
def save(self):
|
|
backend.save_user(self)
|
|
def __str__(self): return self.username
|
|
|
|
@classmethod
|
|
def login(cls,username:str,password:str):
|
|
u=backend.load_user(username)
|
|
if u is None: raise Exception("User doesn't exist")
|
|
if u.auth(password):
|
|
a=AccessToken(u)
|
|
backend.save_token(a)
|
|
return a
|
|
raise Exception("Incorrect password")
|
|
@classmethod
|
|
def register(cls,username:str,password:str,invite:InviteToken=None,email:str=''):
|
|
if set([chr(n) for n in range(32)]+[','])&set(username): raise Exception('Invalid username')
|
|
u=backend.load_user(username)
|
|
if u is not None: raise Exception("User already exists")
|
|
if invite:
|
|
owner=invite.owner
|
|
else:
|
|
owner=None
|
|
u=User(username,*utils.phash(password),owner,email)
|
|
if invite:
|
|
user.state='approved' # Invited users don't need approval
|
|
invite.uses+=1
|
|
backend.save_invite(invite)
|
|
u.save()
|
|
return u
|