initial commit

This commit is contained in:
2025-07-20 00:51:55 +03:00
commit 42684e0cb6
29 changed files with 2615 additions and 0 deletions
+26
View File
@@ -0,0 +1,26 @@
from dataclasses import dataclass
from datetime import datetime
@dataclass(frozen=False)
class User:
username: str
uid: str
_last_activity_str: str = None
def __init__(self, username: str, uid: str, last_activity: datetime):
self.username = username
self.uid = uid
self.last_activity = last_activity
@property
def last_activity(self) -> datetime:
return datetime.strptime(self._last_activity_str, "%Y-%m-%dT%H:%M:%S.%f")
@last_activity.setter
def last_activity(self, value: datetime | str):
if type(value) is str:
self._last_activity_str = value
else:
self._last_activity_str = value.strftime("%Y-%m-%dT%H:%M:%S.%f")
+148
View File
@@ -0,0 +1,148 @@
import asyncio
import logging
from dataclasses import asdict
from datetime import datetime
from pathlib import Path
import aiofiles
import orjson
import config
from singleton import Singleton
from users.classes import User
from users.utils import is_inactive_too_long, decode_token, generate_uid
class UsersDB(metaclass=Singleton):
def __init__(self, db_path: Path | str):
self.path = Path(db_path)
self.users = []
self.by_uid = {}
def _assign_users(self):
self.by_uid = {}
for user in self.users:
self.by_uid[user.uid] = user
async def auto_remove_inactive(self):
while True:
await asyncio.sleep(config.REMOVE_INACTIVE_USERS_INTERVAL_SECONDS)
await self.remove_inactive()
async def remove_inactive(self):
logging.info("Removing inactive users")
new_users = []
for user in self.users:
if not is_inactive_too_long(user):
new_users.append(user)
continue
logging.info(f"Removing inactive user {user.uid}")
self.users = new_users
await self.save_to_disk()
self._assign_users()
logging.info("Finished Removing inactive users")
async def get_user_by_token(self, token: str) -> User | None:
user = decode_token(token)
if user is None:
return None
if user.uid not in self.by_uid.keys():
return None
if is_inactive_too_long(user):
await self.delete_user(user.uid)
return None
return user
async def create_user(self, username: str) -> User:
user = User(
username=username,
uid=generate_uid(username),
last_activity=datetime.now(),
)
self.users.append(user)
await self.save_to_disk()
self._assign_users()
logging.info("Created new user - %s", user.uid)
return user
async def delete_user(self, uid: str):
user = self.by_uid.get(uid)
if not user:
return
self.users.remove(user)
await self.save_to_disk()
self._assign_users()
logging.info("Deleted user - %s", user.uid)
async def update_user(self, uid: str):
user = self.by_uid.get(uid)
if not user:
return
self.users.remove(user)
del self.by_uid[uid]
user.last_activity = datetime.now()
self.users.append(user)
self.by_uid[uid] = user
await self.save_to_disk()
self._assign_users()
logging.info("Updated user - %s", user.uid)
async def save_to_disk(self):
logging.info("Saving Users DB to disk")
users = [asdict(user) for user in self.users]
for i in range(len(users)):
users[i]["last_activity"] = self.users[i]._last_activity_str
del users[i]["_last_activity_str"]
to_save = await asyncio.to_thread(orjson.dumps, users)
async with aiofiles.open(self.path, "wb") as file:
await file.write(to_save)
logging.info("Finished Saving Users DB to disk")
async def load_from_disk(self):
logging.info("Loading Users DB from disk")
if not self.path.exists():
return
async with aiofiles.open(self.path, "rb") as file:
file_content = await file.read()
if len(file_content) == 0:
return
loaded_db = await asyncio.to_thread(orjson.loads, file_content)
new_users = []
for user in loaded_db:
new_user = User(**user)
new_users.append(new_user)
self.users = new_users
self._assign_users()
logging.info("Finished Loading Users DB from disk")
+46
View File
@@ -0,0 +1,46 @@
import logging
import uuid
from dataclasses import asdict
from datetime import datetime, timedelta
import bcrypt
import jwt
import config
from users.classes import User
def generate_uid(username: str) -> str:
uid_bytes = (username + uuid.uuid4().hex).encode("utf-8")
uid_hash = bcrypt.hashpw(uid_bytes, bcrypt.gensalt())
return uid_hash.decode("utf-8")
def generate_token(user: User) -> str:
user.last_activity = datetime.now()
user_dict = asdict(user)
encoded_jwt = jwt.encode(user_dict, config.SECRET, algorithm="HS256")
return encoded_jwt
def decode_token(token: str) -> User | None:
decoded_dict = jwt.decode(token, config.SECRET, algorithms="HS256")
decoded_dict["last_activity"] = decoded_dict["_last_activity_str"]
del decoded_dict["_last_activity_str"]
try:
return User(**decoded_dict)
except Exception as e:
logging.error(e)
return None
def is_inactive_too_long(user: User) -> bool:
dt1 = user.last_activity
dt2 = datetime.now()
diff = dt2 - dt1
max_diff = timedelta(hours=config.MAX_USER_INACTIVE_HOURS)
if diff > max_diff:
return True
return False