improve yandex_disk.py
This commit is contained in:
+108
-48
@@ -5,100 +5,160 @@ import logging
|
|||||||
|
|
||||||
from cachetools import TTLCache
|
from cachetools import TTLCache
|
||||||
from cachetools_async import cached
|
from cachetools_async import cached
|
||||||
from yndx_disk.classes import Directory
|
from yndx_disk.classes import Directory, File
|
||||||
from yndx_disk.clients import AsyncDiskClient
|
from yndx_disk.clients import AsyncDiskClient
|
||||||
|
|
||||||
import config
|
import config
|
||||||
from movies.classes import Movie, TVShow, Season, Episode
|
from movies.classes import Movie, TVShow, Season, Episode
|
||||||
|
|
||||||
|
NAME_DELIMITER = "#"
|
||||||
|
EXTENSION_DELIMITER = "."
|
||||||
|
|
||||||
|
_YANDEX_DISK_REQUEST_SEMAPHORE = asyncio.Semaphore(config.YANDEX_DISK_CONCURRENT_REQUESTS_LIMIT)
|
||||||
|
|
||||||
|
|
||||||
@cached(TTLCache(maxsize=config.CACHE_MAXSIZE, ttl=config.CACHE_TTL))
|
@cached(TTLCache(maxsize=config.CACHE_MAXSIZE, ttl=config.CACHE_TTL))
|
||||||
async def _parse_tv_show(disk_client: AsyncDiskClient, directory: Directory) -> TVShow:
|
async def _parse_tv_show(disk_client: AsyncDiskClient, directory: Directory) -> TVShow:
|
||||||
file_type, tmdb_id, name = map(str.strip, directory.name.split("#"))
|
logging.info("Parsing TV show directory: %s", directory.path)
|
||||||
|
|
||||||
logging.info(f"Parsing TV show %s", tmdb_id)
|
parts = [p.strip() for p in directory.name.split(NAME_DELIMITER)]
|
||||||
|
if len(parts) < 3:
|
||||||
|
logging.error("Invalid TV show directory name format: %s", directory.name)
|
||||||
|
raise ValueError(f"Invalid TV show directory name: {directory.name}")
|
||||||
|
|
||||||
seasons_dict = {}
|
file_type, tmdb_id_str, name = parts[0], parts[1], parts[2]
|
||||||
contents = await disk_client.listdir(path=directory.path, limit=10000)
|
if file_type != "tv":
|
||||||
|
logging.warning("Directory %s expected to be 'tv' type, but found '%s'. Skipping.", directory.name, file_type)
|
||||||
|
raise ValueError(f"Unexpected file type '{file_type}' for TV show directory: {directory.name}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
tmdb_id = int(tmdb_id_str)
|
||||||
|
except ValueError:
|
||||||
|
logging.error("Invalid TMDB ID format for TV show: %s", tmdb_id_str)
|
||||||
|
raise ValueError(f"Invalid TMDB ID format: {tmdb_id_str}")
|
||||||
|
|
||||||
|
seasons_map = {}
|
||||||
|
|
||||||
|
async with _YANDEX_DISK_REQUEST_SEMAPHORE:
|
||||||
|
contents = await disk_client.listdir(path=directory.path, limit=10000)
|
||||||
for obj in contents:
|
for obj in contents:
|
||||||
if type(obj) is Directory:
|
if not isinstance(obj, File):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
obj_name, obj_extension = map(str.strip, obj.name.split("."))
|
obj_name_parts = obj.name.split(EXTENSION_DELIMITER)
|
||||||
season_number, episode_number = map(str.strip, obj_name.split("#"))
|
if len(obj_name_parts) < 2:
|
||||||
|
logging.warning("Skipping file with invalid name (no extension): %s", obj.name)
|
||||||
|
continue
|
||||||
|
|
||||||
if not seasons_dict.get(season_number):
|
obj_name_without_ext = obj_name_parts[0].strip()
|
||||||
seasons_dict[season_number] = []
|
episode_parts = [p.strip() for p in obj_name_without_ext.split(NAME_DELIMITER)]
|
||||||
|
|
||||||
|
if len(episode_parts) < 2:
|
||||||
|
logging.warning("Skipping file with invalid episode name format: %s", obj.name)
|
||||||
|
continue
|
||||||
|
|
||||||
|
season_number_str, episode_number_str = episode_parts[0], episode_parts[1]
|
||||||
|
|
||||||
|
try:
|
||||||
|
season_number = int(season_number_str)
|
||||||
|
episode_number = int(episode_number_str)
|
||||||
|
except ValueError:
|
||||||
|
logging.warning("Skipping file with invalid season/episode number: %s", obj.name)
|
||||||
|
continue
|
||||||
|
|
||||||
|
if season_number not in seasons_map:
|
||||||
|
seasons_map[season_number] = []
|
||||||
|
|
||||||
episode = Episode(
|
episode = Episode(
|
||||||
episode_number=int(episode_number),
|
episode_number=episode_number,
|
||||||
season_number=int(season_number),
|
season_number=season_number,
|
||||||
file_url=obj.file_url
|
file_url=obj.file_url
|
||||||
)
|
)
|
||||||
seasons_dict[season_number].append(episode)
|
seasons_map[season_number].append(episode)
|
||||||
|
|
||||||
seasons = []
|
seasons_list = []
|
||||||
for season_number, episodes in seasons_dict.items():
|
for season_number, episodes in seasons_map.items():
|
||||||
season = Season(
|
season = Season(
|
||||||
season_number=int(season_number),
|
season_number=season_number,
|
||||||
episodes_count=len(episodes),
|
episodes_count=len(episodes),
|
||||||
episodes=tuple(copy.deepcopy(episodes))
|
episodes=tuple(copy.deepcopy(episodes))
|
||||||
)
|
)
|
||||||
seasons.append(season)
|
seasons_list.append(season)
|
||||||
|
|
||||||
|
seasons_list.sort(key=lambda s: s.season_number)
|
||||||
|
|
||||||
|
all_episodes = list(itertools.chain.from_iterable([s.episodes for s in seasons_list]))
|
||||||
|
|
||||||
return TVShow(
|
return TVShow(
|
||||||
tmdb_id=int(tmdb_id),
|
tmdb_id=tmdb_id,
|
||||||
number_of_episodes=len(list(itertools.chain.from_iterable([s.episodes for s in seasons]))),
|
number_of_episodes=len(all_episodes),
|
||||||
number_of_seasons=len(seasons),
|
number_of_seasons=len(seasons_list),
|
||||||
title=name,
|
title=name,
|
||||||
seasons=tuple(seasons)
|
seasons=tuple(seasons_list)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@cached(TTLCache(maxsize=config.CACHE_MAXSIZE, ttl=config.CACHE_TTL))
|
async def _get_contents_on_disk(token: str, path: str) -> list[Movie | TVShow]:
|
||||||
async def _get_contents_on_disk(token: str, path: str) -> list[Movie]:
|
logging.info("Fetching contents from disk for token ending with ...%s, path: %s", token[-10:], path)
|
||||||
logging.info("Fetching contents from disk ...%s", token[-10:])
|
|
||||||
|
|
||||||
disk_client = AsyncDiskClient(token=token, auto_update_info=False)
|
disk_client = AsyncDiskClient(token=token, auto_update_info=False)
|
||||||
files = await disk_client.listdir(path, limit=10000)
|
async with _YANDEX_DISK_REQUEST_SEMAPHORE:
|
||||||
|
contents_on_disk = await disk_client.listdir(path, limit=10000)
|
||||||
|
|
||||||
logging.info("Found %s files on disk ...%s", len(files), token[-10:])
|
logging.info("Found %s items on disk for token ending with ...%s", len(contents_on_disk), token[-10:])
|
||||||
|
|
||||||
movies = []
|
movies = []
|
||||||
tv_shows = []
|
tv_show_tasks = []
|
||||||
|
|
||||||
for obj in files:
|
for obj in contents_on_disk:
|
||||||
file_type, tmdb_id, name = map(str.strip, obj.name.split("#"))
|
parts = [p.strip() for p in obj.name.split(NAME_DELIMITER)]
|
||||||
|
if len(parts) < 3:
|
||||||
|
logging.warning("Skipping item with invalid name format: %s", obj.name)
|
||||||
|
continue
|
||||||
|
|
||||||
|
file_type, tmdb_id_str, name_or_filename = parts[0], parts[1], parts[2]
|
||||||
|
|
||||||
|
try:
|
||||||
|
if file_type == "movie" and isinstance(obj, File):
|
||||||
|
name_parts = name_or_filename.split(EXTENSION_DELIMITER)
|
||||||
|
if len(name_parts) < 2:
|
||||||
|
logging.warning("Skipping movie file with no extension: %s", obj.name)
|
||||||
|
continue
|
||||||
|
title = name_parts[0].strip()
|
||||||
|
|
||||||
match file_type:
|
|
||||||
case "movie":
|
|
||||||
name, extension = map(str.strip, name.split("."))
|
|
||||||
movie = Movie(
|
movie = Movie(
|
||||||
tmdb_id=int(tmdb_id),
|
tmdb_id=int(tmdb_id_str),
|
||||||
file_size=int(obj.size),
|
file_size=int(obj.size),
|
||||||
file_url=obj.file_url,
|
file_url=obj.file_url,
|
||||||
title=name
|
title=title
|
||||||
)
|
)
|
||||||
movies.append(movie)
|
movies.append(movie)
|
||||||
case "tv":
|
elif file_type == "tv" and isinstance(obj, Directory):
|
||||||
pass
|
tv_show_tasks.append(_parse_tv_show(disk_client=disk_client, directory=obj))
|
||||||
tv_show = await _parse_tv_show(disk_client=disk_client, directory=obj)
|
else:
|
||||||
tv_shows.append(tv_show)
|
logging.warning("Skipping unknown/unsupported item type or mismatch: %s (Type: %s, Is Directory: %s)",
|
||||||
case _:
|
obj.name, file_type, isinstance(obj, Directory))
|
||||||
continue
|
except ValueError as e:
|
||||||
|
logging.error("Error parsing item %s: %s", obj.name, e)
|
||||||
|
except Exception as e:
|
||||||
|
logging.critical("Unexpected error processing item %s: %s", obj.name, e, exc_info=True)
|
||||||
|
|
||||||
logging.info("Found %s contents on disk ...%s", len(movies), token[-10:])
|
tv_shows = await asyncio.gather(*tv_show_tasks)
|
||||||
|
|
||||||
return movies + tv_shows
|
all_contents = movies + tv_shows
|
||||||
|
logging.info("Found %s total contents on disk for token ending with ...%s", len(all_contents), token[-10:])
|
||||||
|
|
||||||
|
return all_contents
|
||||||
|
|
||||||
|
|
||||||
async def get_all_contents() -> list[Movie]:
|
async def get_all_contents() -> list[Movie | TVShow]:
|
||||||
logging.info("Fetching all contents on all disks")
|
logging.info("Fetching all contents from all disks...")
|
||||||
|
|
||||||
tasks = [_get_contents_on_disk(token, path) for token, path in config.YANDEX_CONFIGS]
|
tasks = [_get_contents_on_disk(token, path) for token, path in config.YANDEX_CONFIGS]
|
||||||
movies = await asyncio.gather(*tasks)
|
all_contents_lists = await asyncio.gather(*tasks)
|
||||||
movies = list(itertools.chain.from_iterable(movies))
|
|
||||||
|
|
||||||
logging.info("Found %s contents on all disks", len(movies))
|
all_contents = list(itertools.chain.from_iterable(all_contents_lists))
|
||||||
|
|
||||||
return movies
|
logging.info("Found %s total contents on all disks.", len(all_contents))
|
||||||
|
|
||||||
|
return all_contents
|
||||||
|
|||||||
Reference in New Issue
Block a user