auth/backends/file.py

98 lines
2.9 KiB
Python

from os import path,rename
def init():
if not path.exists(config['path']): raise Exception(f"Backend directory doesn't exist: {config['path']}")
for n in ['users','invite_tokens','access_tokens']:
p=path.join(config['path'],n+'.csv')
globals()[n]=p
if not path.exists(p): open(p,'w').write('\n')
if path.isdir(p): raise Exception(f"Backend file is a directory: {p}")
try: open(p).close()
except PermissionError: raise Exception(f"Backend file can't be read: {p}")
try: open(p,'a').close()
except PermissionError: raise Exception(f"Backend file can't be written: {p}")
def _load(name,pos,file,obj):
for line in linegen(file):
split=line.split(',')
if split[pos]==name: return obj(*split)
raise Exception('No such',obj.__name__,'with identifier',name)
def _load_multi(name,pos,file,obj):
out=[]
for line in linegen(file):
split=line.split(',')
if split[pos]==name: out.append(obj(*split))
return out
def _save(thing,pos,file,comp):
gen=linegen(file)
for line in gen:
split=line.split(',')
if split[pos]==comp: update(file,{gen.send(True):thing.serialise}); break
else: update(file,{-1:thing.serialise})
def _delete(thing,pos,file,comp):
gen=linegen(file)
for line in gen:
split=line.split(',')
if split[pos]==comp: remove(file,[gen.send(True)]); break
def load_user(username):
return _load(username,0,users,models.User)
def save_user(user):
return _save(user,0,users,user.username)
def delete_user(user):
return _delete(user,0,users,user.username)
def load_token(tokeid):
return _load(tokeid,0,access_tokens,models.AccessToken)
def load_tokens(user):
return _load_multi(user.username,1,access_tokens,models.AccessToken)
def save_token(token):
return _save(token,0,access_tokens,token.value)
def delete_token(token):
return _delete(token,0,access_tokens,token.value)
def load_invite(tokeid):
return _load(tokeid,0,invite_tokens,models.InviteToken)
def load_invites(user):
return _load_multi(user.username,1,invite_tokens,models.InviteToken)
def save_invite(token):
return _save(token,0,invite_tokens,token.value)
def delete_invite(token):
return _delete(token,0,invite_tokens,token.value)
def linegen(path):
i=0
file=open(path)
line=file.readline()
while line:
i+=1
response=yield line
if response is not None: yield i
line=file.readline()
file.close()
def update(path,lines):
def _action(file,i,line):
if i in lines: file.write(lines[i]+'\n')
else: file.write(line)
def _post(file):
if -1 in lines: file.write(lines[-1]+'\n')
writefile(path,_action,_post)
def remove(path,lines):
def _action(file,i,line):
if i not in lines: file.write(line)
writefile(path,_action)
def writefile(path,action,post=None):
i=0
file=open(path)
write=open(f'{path}.tmp','w')
line=file.readline()
while line:
i+=1
action(write,i,line)
line=file.readline()
if post is not None: post(write)
file.close()
write.close()
rename(f'{path}.tmp',path)