This repository has been archived on 2026-04-27. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
watch-together-yandex-disk/movies/yandex_disk.py
T
2025-07-20 22:22:34 +03:00

165 lines
6.0 KiB
Python

import asyncio
import copy
import itertools
import logging
from cachetools import TTLCache
from cachetools_async import cached
from yndx_disk.classes import Directory, File
from yndx_disk.clients import AsyncDiskClient
import config
from movies.classes import Movie, TVShow, Season, Episode
NAME_DELIMITER = "#"
EXTENSION_DELIMITER = "."
_YANDEX_DISK_REQUEST_SEMAPHORE = asyncio.Semaphore(config.YANDEX_DISK_CONCURRENT_REQUESTS_LIMIT)
@cached(TTLCache(maxsize=config.CACHE_MAXSIZE, ttl=config.CACHE_TTL))
async def _parse_tv_show(disk_client: AsyncDiskClient, directory: Directory) -> TVShow:
logging.info("Parsing TV show directory: %s", directory.path)
parts = [p.strip() for p in directory.name.split(NAME_DELIMITER)]
if len(parts) < 3:
logging.error("Invalid TV show directory name format: %s", directory.name)
raise ValueError(f"Invalid TV show directory name: {directory.name}")
file_type, tmdb_id_str, name = parts[0], parts[1], parts[2]
if file_type != "tv":
logging.warning("Directory %s expected to be 'tv' type, but found '%s'. Skipping.", directory.name, file_type)
raise ValueError(f"Unexpected file type '{file_type}' for TV show directory: {directory.name}")
try:
tmdb_id = int(tmdb_id_str)
except ValueError:
logging.error("Invalid TMDB ID format for TV show: %s", tmdb_id_str)
raise ValueError(f"Invalid TMDB ID format: {tmdb_id_str}")
seasons_map = {}
async with _YANDEX_DISK_REQUEST_SEMAPHORE:
contents = await disk_client.listdir(path=directory.path, limit=10000)
for obj in contents:
if not isinstance(obj, File):
continue
obj_name_parts = obj.name.split(EXTENSION_DELIMITER)
if len(obj_name_parts) < 2:
logging.warning("Skipping file with invalid name (no extension): %s", obj.name)
continue
obj_name_without_ext = obj_name_parts[0].strip()
episode_parts = [p.strip() for p in obj_name_without_ext.split(NAME_DELIMITER)]
if len(episode_parts) < 2:
logging.warning("Skipping file with invalid episode name format: %s", obj.name)
continue
season_number_str, episode_number_str = episode_parts[0], episode_parts[1]
try:
season_number = int(season_number_str)
episode_number = int(episode_number_str)
except ValueError:
logging.warning("Skipping file with invalid season/episode number: %s", obj.name)
continue
if season_number not in seasons_map:
seasons_map[season_number] = []
episode = Episode(
episode_number=episode_number,
season_number=season_number,
file_url=obj.file_url
)
seasons_map[season_number].append(episode)
seasons_list = []
for season_number, episodes in seasons_map.items():
season = Season(
season_number=season_number,
episodes_count=len(episodes),
episodes=tuple(copy.deepcopy(episodes))
)
seasons_list.append(season)
seasons_list.sort(key=lambda s: s.season_number)
all_episodes = list(itertools.chain.from_iterable([s.episodes for s in seasons_list]))
return TVShow(
tmdb_id=tmdb_id,
number_of_episodes=len(all_episodes),
number_of_seasons=len(seasons_list),
title=name,
seasons=tuple(seasons_list)
)
async def _get_contents_on_disk(token: str, path: str) -> list[Movie | TVShow]:
logging.info("Fetching contents from disk for token ending with ...%s, path: %s", token[-10:], path)
disk_client = AsyncDiskClient(token=token, auto_update_info=False)
async with _YANDEX_DISK_REQUEST_SEMAPHORE:
contents_on_disk = await disk_client.listdir(path, limit=10000)
logging.info("Found %s items on disk for token ending with ...%s", len(contents_on_disk), token[-10:])
movies = []
tv_show_tasks = []
for obj in contents_on_disk:
parts = [p.strip() for p in obj.name.split(NAME_DELIMITER)]
if len(parts) < 3:
logging.warning("Skipping item with invalid name format: %s", obj.name)
continue
file_type, tmdb_id_str, name_or_filename = parts[0], parts[1], parts[2]
try:
if file_type == "movie" and isinstance(obj, File):
name_parts = name_or_filename.split(EXTENSION_DELIMITER)
if len(name_parts) < 2:
logging.warning("Skipping movie file with no extension: %s", obj.name)
continue
title = name_parts[0].strip()
movie = Movie(
tmdb_id=int(tmdb_id_str),
file_size=int(obj.size),
file_url=obj.file_url,
title=title
)
movies.append(movie)
elif file_type == "tv" and isinstance(obj, Directory):
tv_show_tasks.append(_parse_tv_show(disk_client=disk_client, directory=obj))
else:
logging.warning("Skipping unknown/unsupported item type or mismatch: %s (Type: %s, Is Directory: %s)",
obj.name, file_type, isinstance(obj, Directory))
except ValueError as e:
logging.error("Error parsing item %s: %s", obj.name, e)
except Exception as e:
logging.critical("Unexpected error processing item %s: %s", obj.name, e, exc_info=True)
tv_shows = await asyncio.gather(*tv_show_tasks)
all_contents = movies + tv_shows
logging.info("Found %s total contents on disk for token ending with ...%s", len(all_contents), token[-10:])
return all_contents
async def get_all_contents() -> list[Movie | TVShow]:
logging.info("Fetching all contents from all disks...")
tasks = [_get_contents_on_disk(token, path) for token, path in config.YANDEX_CONFIGS]
all_contents_lists = await asyncio.gather(*tasks)
all_contents = list(itertools.chain.from_iterable(all_contents_lists))
logging.info("Found %s total contents on all disks.", len(all_contents))
return all_contents