This repository has been archived on 2026-04-27. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
watch-together-yandex-disk/users/db.py
T
2025-07-20 00:51:55 +03:00

149 lines
3.7 KiB
Python

import asyncio
import logging
from dataclasses import asdict
from datetime import datetime
from pathlib import Path
import aiofiles
import orjson
import config
from singleton import Singleton
from users.classes import User
from users.utils import is_inactive_too_long, decode_token, generate_uid
class UsersDB(metaclass=Singleton):
def __init__(self, db_path: Path | str):
self.path = Path(db_path)
self.users = []
self.by_uid = {}
def _assign_users(self):
self.by_uid = {}
for user in self.users:
self.by_uid[user.uid] = user
async def auto_remove_inactive(self):
while True:
await asyncio.sleep(config.REMOVE_INACTIVE_USERS_INTERVAL_SECONDS)
await self.remove_inactive()
async def remove_inactive(self):
logging.info("Removing inactive users")
new_users = []
for user in self.users:
if not is_inactive_too_long(user):
new_users.append(user)
continue
logging.info(f"Removing inactive user {user.uid}")
self.users = new_users
await self.save_to_disk()
self._assign_users()
logging.info("Finished Removing inactive users")
async def get_user_by_token(self, token: str) -> User | None:
user = decode_token(token)
if user is None:
return None
if user.uid not in self.by_uid.keys():
return None
if is_inactive_too_long(user):
await self.delete_user(user.uid)
return None
return user
async def create_user(self, username: str) -> User:
user = User(
username=username,
uid=generate_uid(username),
last_activity=datetime.now(),
)
self.users.append(user)
await self.save_to_disk()
self._assign_users()
logging.info("Created new user - %s", user.uid)
return user
async def delete_user(self, uid: str):
user = self.by_uid.get(uid)
if not user:
return
self.users.remove(user)
await self.save_to_disk()
self._assign_users()
logging.info("Deleted user - %s", user.uid)
async def update_user(self, uid: str):
user = self.by_uid.get(uid)
if not user:
return
self.users.remove(user)
del self.by_uid[uid]
user.last_activity = datetime.now()
self.users.append(user)
self.by_uid[uid] = user
await self.save_to_disk()
self._assign_users()
logging.info("Updated user - %s", user.uid)
async def save_to_disk(self):
logging.info("Saving Users DB to disk")
users = [asdict(user) for user in self.users]
for i in range(len(users)):
users[i]["last_activity"] = self.users[i]._last_activity_str
del users[i]["_last_activity_str"]
to_save = await asyncio.to_thread(orjson.dumps, users)
async with aiofiles.open(self.path, "wb") as file:
await file.write(to_save)
logging.info("Finished Saving Users DB to disk")
async def load_from_disk(self):
logging.info("Loading Users DB from disk")
if not self.path.exists():
return
async with aiofiles.open(self.path, "rb") as file:
file_content = await file.read()
if len(file_content) == 0:
return
loaded_db = await asyncio.to_thread(orjson.loads, file_content)
new_users = []
for user in loaded_db:
new_user = User(**user)
new_users.append(new_user)
self.users = new_users
self._assign_users()
logging.info("Finished Loading Users DB from disk")