diff --git a/movies/yandex_disk.py b/movies/yandex_disk.py index ffd7636..66c3eb4 100644 --- a/movies/yandex_disk.py +++ b/movies/yandex_disk.py @@ -5,100 +5,160 @@ import logging from cachetools import TTLCache from cachetools_async import cached -from yndx_disk.classes import Directory +from yndx_disk.classes import Directory, File from yndx_disk.clients import AsyncDiskClient import config from movies.classes import Movie, TVShow, Season, Episode +NAME_DELIMITER = "#" +EXTENSION_DELIMITER = "." + +_YANDEX_DISK_REQUEST_SEMAPHORE = asyncio.Semaphore(config.YANDEX_DISK_CONCURRENT_REQUESTS_LIMIT) + @cached(TTLCache(maxsize=config.CACHE_MAXSIZE, ttl=config.CACHE_TTL)) async def _parse_tv_show(disk_client: AsyncDiskClient, directory: Directory) -> TVShow: - file_type, tmdb_id, name = map(str.strip, directory.name.split("#")) + logging.info("Parsing TV show directory: %s", directory.path) - logging.info(f"Parsing TV show %s", tmdb_id) + parts = [p.strip() for p in directory.name.split(NAME_DELIMITER)] + if len(parts) < 3: + logging.error("Invalid TV show directory name format: %s", directory.name) + raise ValueError(f"Invalid TV show directory name: {directory.name}") - seasons_dict = {} - contents = await disk_client.listdir(path=directory.path, limit=10000) + file_type, tmdb_id_str, name = parts[0], parts[1], parts[2] + if file_type != "tv": + logging.warning("Directory %s expected to be 'tv' type, but found '%s'. Skipping.", directory.name, file_type) + raise ValueError(f"Unexpected file type '{file_type}' for TV show directory: {directory.name}") + + try: + tmdb_id = int(tmdb_id_str) + except ValueError: + logging.error("Invalid TMDB ID format for TV show: %s", tmdb_id_str) + raise ValueError(f"Invalid TMDB ID format: {tmdb_id_str}") + + seasons_map = {} + + async with _YANDEX_DISK_REQUEST_SEMAPHORE: + contents = await disk_client.listdir(path=directory.path, limit=10000) for obj in contents: - if type(obj) is Directory: + if not isinstance(obj, File): continue - obj_name, obj_extension = map(str.strip, obj.name.split(".")) - season_number, episode_number = map(str.strip, obj_name.split("#")) + obj_name_parts = obj.name.split(EXTENSION_DELIMITER) + if len(obj_name_parts) < 2: + logging.warning("Skipping file with invalid name (no extension): %s", obj.name) + continue - if not seasons_dict.get(season_number): - seasons_dict[season_number] = [] + obj_name_without_ext = obj_name_parts[0].strip() + episode_parts = [p.strip() for p in obj_name_without_ext.split(NAME_DELIMITER)] + + if len(episode_parts) < 2: + logging.warning("Skipping file with invalid episode name format: %s", obj.name) + continue + + season_number_str, episode_number_str = episode_parts[0], episode_parts[1] + + try: + season_number = int(season_number_str) + episode_number = int(episode_number_str) + except ValueError: + logging.warning("Skipping file with invalid season/episode number: %s", obj.name) + continue + + if season_number not in seasons_map: + seasons_map[season_number] = [] episode = Episode( - episode_number=int(episode_number), - season_number=int(season_number), + episode_number=episode_number, + season_number=season_number, file_url=obj.file_url ) - seasons_dict[season_number].append(episode) + seasons_map[season_number].append(episode) - seasons = [] - for season_number, episodes in seasons_dict.items(): + seasons_list = [] + for season_number, episodes in seasons_map.items(): season = Season( - season_number=int(season_number), + season_number=season_number, episodes_count=len(episodes), episodes=tuple(copy.deepcopy(episodes)) ) - seasons.append(season) + seasons_list.append(season) + + seasons_list.sort(key=lambda s: s.season_number) + + all_episodes = list(itertools.chain.from_iterable([s.episodes for s in seasons_list])) return TVShow( - tmdb_id=int(tmdb_id), - number_of_episodes=len(list(itertools.chain.from_iterable([s.episodes for s in seasons]))), - number_of_seasons=len(seasons), + tmdb_id=tmdb_id, + number_of_episodes=len(all_episodes), + number_of_seasons=len(seasons_list), title=name, - seasons=tuple(seasons) + seasons=tuple(seasons_list) ) -@cached(TTLCache(maxsize=config.CACHE_MAXSIZE, ttl=config.CACHE_TTL)) -async def _get_contents_on_disk(token: str, path: str) -> list[Movie]: - logging.info("Fetching contents from disk ...%s", token[-10:]) +async def _get_contents_on_disk(token: str, path: str) -> list[Movie | TVShow]: + logging.info("Fetching contents from disk for token ending with ...%s, path: %s", token[-10:], path) disk_client = AsyncDiskClient(token=token, auto_update_info=False) - files = await disk_client.listdir(path, limit=10000) + async with _YANDEX_DISK_REQUEST_SEMAPHORE: + contents_on_disk = await disk_client.listdir(path, limit=10000) - logging.info("Found %s files on disk ...%s", len(files), token[-10:]) + logging.info("Found %s items on disk for token ending with ...%s", len(contents_on_disk), token[-10:]) movies = [] - tv_shows = [] + tv_show_tasks = [] - for obj in files: - file_type, tmdb_id, name = map(str.strip, obj.name.split("#")) + for obj in contents_on_disk: + parts = [p.strip() for p in obj.name.split(NAME_DELIMITER)] + if len(parts) < 3: + logging.warning("Skipping item with invalid name format: %s", obj.name) + continue + + file_type, tmdb_id_str, name_or_filename = parts[0], parts[1], parts[2] + + try: + if file_type == "movie" and isinstance(obj, File): + name_parts = name_or_filename.split(EXTENSION_DELIMITER) + if len(name_parts) < 2: + logging.warning("Skipping movie file with no extension: %s", obj.name) + continue + title = name_parts[0].strip() - match file_type: - case "movie": - name, extension = map(str.strip, name.split(".")) movie = Movie( - tmdb_id=int(tmdb_id), + tmdb_id=int(tmdb_id_str), file_size=int(obj.size), file_url=obj.file_url, - title=name + title=title ) movies.append(movie) - case "tv": - pass - tv_show = await _parse_tv_show(disk_client=disk_client, directory=obj) - tv_shows.append(tv_show) - case _: - continue + elif file_type == "tv" and isinstance(obj, Directory): + tv_show_tasks.append(_parse_tv_show(disk_client=disk_client, directory=obj)) + else: + logging.warning("Skipping unknown/unsupported item type or mismatch: %s (Type: %s, Is Directory: %s)", + obj.name, file_type, isinstance(obj, Directory)) + except ValueError as e: + logging.error("Error parsing item %s: %s", obj.name, e) + except Exception as e: + logging.critical("Unexpected error processing item %s: %s", obj.name, e, exc_info=True) - logging.info("Found %s contents on disk ...%s", len(movies), token[-10:]) + tv_shows = await asyncio.gather(*tv_show_tasks) - return movies + tv_shows + all_contents = movies + tv_shows + logging.info("Found %s total contents on disk for token ending with ...%s", len(all_contents), token[-10:]) + + return all_contents -async def get_all_contents() -> list[Movie]: - logging.info("Fetching all contents on all disks") +async def get_all_contents() -> list[Movie | TVShow]: + logging.info("Fetching all contents from all disks...") tasks = [_get_contents_on_disk(token, path) for token, path in config.YANDEX_CONFIGS] - movies = await asyncio.gather(*tasks) - movies = list(itertools.chain.from_iterable(movies)) + all_contents_lists = await asyncio.gather(*tasks) - logging.info("Found %s contents on all disks", len(movies)) + all_contents = list(itertools.chain.from_iterable(all_contents_lists)) - return movies + logging.info("Found %s total contents on all disks.", len(all_contents)) + + return all_contents