mirror of
https://github.com/arabianq/yndx-disk.git
synced 2026-04-27 22:21:23 +00:00
fix commit
This commit is contained in:
Generated
+3
@@ -1,4 +1,7 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project version="4">
|
||||
<component name="Black">
|
||||
<option name="sdkName" value="Python 3.12 (yandex_disk_rest_api)" />
|
||||
</component>
|
||||
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.12 (yandex_disk_rest_api)" project-jdk-type="Python SDK" />
|
||||
</project>
|
||||
Generated
+1
-1
@@ -4,7 +4,7 @@
|
||||
<content url="file://$MODULE_DIR$">
|
||||
<excludeFolder url="file://$MODULE_DIR$/.venv" />
|
||||
</content>
|
||||
<orderEntry type="jdk" jdkName="Python 3.12 (.venv)" jdkType="Python SDK" />
|
||||
<orderEntry type="jdk" jdkName="Python 3.12 (yndx_disk)" jdkType="Python SDK" />
|
||||
<orderEntry type="sourceFolder" forTests="false" />
|
||||
</component>
|
||||
</module>
|
||||
@@ -0,0 +1,2 @@
|
||||
httpx
|
||||
aiofiles
|
||||
@@ -0,0 +1,23 @@
|
||||
from setuptools import setup, find_packages
|
||||
|
||||
|
||||
setup(
|
||||
name="yndx_disk",
|
||||
version="0.1",
|
||||
packages=find_packages(),
|
||||
install_requires=[
|
||||
"httpx", "aiofiles"
|
||||
],
|
||||
author="Tarasov Alexander",
|
||||
author_email="a.tevg@ya.ru",
|
||||
description="Wrapper for yandex disk rest api",
|
||||
long_description=open('README.md').read(),
|
||||
long_description_content_type="text/markdown",
|
||||
url="https://gitverse.ru/arabianq/yndx_disk",
|
||||
classifiers=[
|
||||
"Programming Language :: Python :: 3",
|
||||
"License :: OSI Approved :: MIT License",
|
||||
"Operating System :: OS Independent",
|
||||
],
|
||||
python_requires=">=3.10",
|
||||
)
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
from .test import *
|
||||
@@ -0,0 +1,32 @@
|
||||
import httpx
|
||||
import yndx_disk.api.utils as utils
|
||||
|
||||
|
||||
BASE_URL = "https://cloud-api.yandex.net/v1/disk"
|
||||
|
||||
|
||||
async def get_disk_info(token: str, fields: str = "", timeout: int = 30) -> httpx.Response:
|
||||
"""
|
||||
Get information about the disk.
|
||||
|
||||
Parameters:
|
||||
- token (str): The authentication token for the server.
|
||||
- fields (str, optional): The fields to be included in the response. Defaults to "".
|
||||
- timeout (int, optional): The timeout for the request in seconds. Defaults to 30.
|
||||
|
||||
Returns:
|
||||
- httpx.Response: The response from the server containing the disk information.
|
||||
"""
|
||||
url = BASE_URL
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.get(
|
||||
url=url,
|
||||
headers=utils.generate_headers(token=token),
|
||||
params={
|
||||
"fields": fields,
|
||||
},
|
||||
timeout=timeout
|
||||
)
|
||||
|
||||
return response
|
||||
|
||||
@@ -0,0 +1,35 @@
|
||||
import httpx
|
||||
import yndx_disk.api.utils as utils
|
||||
|
||||
|
||||
BASE_URL = "https://cloud-api.yandex.net/v1/disk/operations"
|
||||
|
||||
|
||||
async def get_operation_status(token: str, operation_id: str, fields: str = "", timeout: int = 30) -> httpx.Response:
|
||||
"""
|
||||
Get the status of an operation on the server.
|
||||
|
||||
Parameters:
|
||||
- token (str): The authentication token for the server.
|
||||
- operation_id (str): The ID of the operation to get the status for.
|
||||
- fields (str, optional): The fields to be included in the response. Defaults to "".
|
||||
- timeout (int, optional): The timeout for the request in seconds. Defaults to 30.
|
||||
|
||||
Returns:
|
||||
- httpx.Response: The response from the server containing the status of the operation.
|
||||
"""
|
||||
url = BASE_URL + f"/{operation_id}"
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.get(
|
||||
url=url,
|
||||
headers=utils.generate_headers(token=token),
|
||||
params={
|
||||
"operation_id": operation_id,
|
||||
"fields": fields,
|
||||
},
|
||||
timeout=timeout
|
||||
)
|
||||
|
||||
return response
|
||||
|
||||
|
||||
@@ -0,0 +1,120 @@
|
||||
import httpx
|
||||
import yndx_disk.api.utils as utils
|
||||
|
||||
|
||||
BASE_URL = "https://cloud-api.yandex.net/v1/disk/public/resources"
|
||||
|
||||
|
||||
async def get_info(token: str, public_key: str, fields: str = "", path: str = "", preview_size: str = "",
|
||||
sort: str = "", preview_crop: bool = False, limit: int = 100, offset: int = 0, timeout: int = 30) -> httpx.Response:
|
||||
"""
|
||||
Get information about a file or directory on the disk.
|
||||
|
||||
Parameters:
|
||||
- token (str): The authentication token for the disk.
|
||||
- public_key (str): The public key of the file or directory to get information about.
|
||||
- fields (str, optional): The fields to be included in the response. Defaults to "".
|
||||
- path (str, optional): The path of the file or directory to get information about. Defaults to "".
|
||||
- preview_size (str, optional): The size of the preview to be included in the response. Defaults to "".
|
||||
- sort (str, optional): The sorting order of the response. Defaults to "".
|
||||
- preview_crop (bool, optional): Whether to crop the preview. Defaults to False.
|
||||
- limit (int, optional): The maximum number of items to return in the response. Defaults to 100.
|
||||
- offset (int, optional): The number of items to skip before returning the response. Defaults to 0.
|
||||
- timeout (int, optional): The timeout for the request in seconds. Defaults to 30.
|
||||
|
||||
Returns:
|
||||
- httpx.Response: The response from the server containing the information about the file or directory.
|
||||
"""
|
||||
url = BASE_URL
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.get(
|
||||
url=url,
|
||||
headers=utils.generate_headers(token=token),
|
||||
params={
|
||||
"public_key": public_key,
|
||||
"fields": fields,
|
||||
"path": utils.parse_path(path),
|
||||
"preview_size": preview_size,
|
||||
"sort": sort,
|
||||
"preview_crop": preview_crop,
|
||||
"limit": limit,
|
||||
"offset": offset,
|
||||
},
|
||||
timeout=timeout
|
||||
)
|
||||
|
||||
return response
|
||||
|
||||
|
||||
|
||||
|
||||
async def get_url(token: str, public_key: str, fields: str = "", path: str = "", timeout: int = 30) -> httpx.Response:
|
||||
"""
|
||||
Get the download URL for a file or directory on the disk.
|
||||
|
||||
Parameters:
|
||||
- token (str): The authentication token for the disk.
|
||||
- public_key (str): The public key of the file or directory to get the download URL for.
|
||||
- fields (str, optional): The fields to be included in the response. Defaults to "".
|
||||
- path (str, optional): The path of the file or directory to get the download URL for. Defaults to "".
|
||||
- timeout (int, optional): The timeout for the request in seconds. Defaults to 30.
|
||||
|
||||
Returns:
|
||||
- httpx.Response: The response from the server containing the download URL for the file or directory.
|
||||
"""
|
||||
url = BASE_URL + "/download"
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.get(
|
||||
url=url,
|
||||
headers=utils.generate_headers(token=token),
|
||||
params={
|
||||
"public_key": public_key,
|
||||
"fields": fields,
|
||||
"path": utils.parse_path(path),
|
||||
},
|
||||
timeout=timeout
|
||||
)
|
||||
|
||||
return response
|
||||
|
||||
|
||||
|
||||
|
||||
async def save_to_disk(token: str, public_key: str, fields: str = "", name: str = "", path: str = "", save_path: str = "", force_async: bool = False, timeout: int = 30) -> httpx.Response:
|
||||
"""
|
||||
Save a file or directory from the disk to your own disk.
|
||||
|
||||
Parameters:
|
||||
- token (str): The authentication token for the disk.
|
||||
- public_key (str): The public key of the file or directory to be saved.
|
||||
- fields (str, optional): The fields to be included in the response. Defaults to "".
|
||||
- name (str, optional): The name of the file or directory to be saved. Defaults to "".
|
||||
- path (str, optional): The path of the file or directory to be saved. Defaults to "".
|
||||
- save_path (str, optional): The path where the file or directory should be saved to. Defaults to "".
|
||||
- force_async (bool, optional): Whether to force asynchronous saving. Defaults to False.
|
||||
- timeout (int, optional): The timeout for the request in seconds. Defaults to 30.
|
||||
|
||||
Returns:
|
||||
- httpx.Response: The response from the server after the save operation.
|
||||
"""
|
||||
url = BASE_URL + "/save-to-disk"
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.post(
|
||||
url=url,
|
||||
headers=utils.generate_headers(token=token),
|
||||
params={
|
||||
"public_key": public_key,
|
||||
"fields": fields,
|
||||
"name": name,
|
||||
"path": utils.parse_path(path),
|
||||
"save_path": save_path,
|
||||
"force_async": force_async,
|
||||
},
|
||||
timeout=timeout
|
||||
)
|
||||
|
||||
return response
|
||||
|
||||
|
||||
+426
-33
@@ -1,13 +1,27 @@
|
||||
import asyncio
|
||||
import httpx
|
||||
import yndx_disk.utils as utils
|
||||
import yndx_disk.exceptions as exceptions
|
||||
import yndx_disk.api.utils as utils
|
||||
|
||||
|
||||
BASE_URL = "https://cloud-api.yandex.net/v1/disk/resources"
|
||||
|
||||
|
||||
async def delete(token: str, path: str, fields: str = "", md5: str = "", force_async: bool = False,
|
||||
permanently: bool = False, timeout: int = 30):
|
||||
permanently: bool = False, timeout: int = 30) -> httpx.Response:
|
||||
"""
|
||||
Delete a file or directory from the disk.
|
||||
|
||||
Parameters:
|
||||
- token (str): The authentication token for the disk.
|
||||
- path (str): The path of the file or directory to be deleted.
|
||||
- fields (str, optional): The fields to be included in the response. Defaults to "".
|
||||
- md5 (str, optional): The MD5 hash of the file to be deleted. Defaults to "".
|
||||
- force_async (bool, optional): Whether to force asynchronous deletion. Defaults to False.
|
||||
- permanently (bool, optional): Whether to delete the file permanently. Defaults to False.
|
||||
- timeout (int, optional): The timeout for the request in seconds. Defaults to 30.
|
||||
|
||||
Returns:
|
||||
- httpx.Response: The response from the server after the deletion operation.
|
||||
"""
|
||||
url = BASE_URL
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
@@ -15,7 +29,7 @@ async def delete(token: str, path: str, fields: str = "", md5: str = "", force_a
|
||||
url=url,
|
||||
headers=utils.generate_headers(token=token),
|
||||
params={
|
||||
"path": path,
|
||||
"path": utils.parse_path(path),
|
||||
"fields": fields,
|
||||
"md5": md5,
|
||||
"force_async": force_async,
|
||||
@@ -24,17 +38,28 @@ async def delete(token: str, path: str, fields: str = "", md5: str = "", force_a
|
||||
timeout=timeout
|
||||
)
|
||||
|
||||
response_json = response.json()
|
||||
|
||||
match response.status_code:
|
||||
case 200:
|
||||
return response_json
|
||||
case _:
|
||||
raise exceptions.YandexDiskAPIException(response.status_code, response_json.get("description", ""))
|
||||
return response
|
||||
|
||||
|
||||
async def get_info(token: str, path: str, fields: str = "", preview_size: str = "", sort: str = "",
|
||||
preview_crop: bool = False, limit: int = 100, offset: int = 0, timeout: int = 30):
|
||||
preview_crop: bool = False, limit: int = 100, offset: int = 0, timeout: int = 30) -> httpx.Response:
|
||||
"""
|
||||
Get information about a file or directory on the disk.
|
||||
|
||||
Parameters:
|
||||
- token (str): The authentication token for the disk.
|
||||
- path (str): The path of the file or directory to get information about.
|
||||
- fields (str, optional): The fields to be included in the response. Defaults to "".
|
||||
- preview_size (str, optional): The size of the preview to be included in the response. Defaults to "".
|
||||
- sort (str, optional): The sorting order of the response. Defaults to "".
|
||||
- preview_crop (bool, optional): Whether to crop the preview. Defaults to False.
|
||||
- limit (int, optional): The maximum number of items to return in the response. Defaults to 100.
|
||||
- offset (int, optional): The number of items to skip before returning the response. Defaults to 0.
|
||||
- timeout (int, optional): The timeout for the request in seconds. Defaults to 30.
|
||||
|
||||
Returns:
|
||||
- httpx.Response: The response from the server containing the information about the file or directory.
|
||||
"""
|
||||
url = BASE_URL
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
@@ -42,7 +67,7 @@ async def get_info(token: str, path: str, fields: str = "", preview_size: str =
|
||||
url=url,
|
||||
headers=utils.generate_headers(token=token),
|
||||
params={
|
||||
"path": path,
|
||||
"path": utils.parse_path(path),
|
||||
"fields": fields,
|
||||
"preview_size": preview_size,
|
||||
"sort": sort,
|
||||
@@ -53,52 +78,420 @@ async def get_info(token: str, path: str, fields: str = "", preview_size: str =
|
||||
timeout=timeout
|
||||
)
|
||||
|
||||
response_json = response.json()
|
||||
return response
|
||||
|
||||
match response.status_code:
|
||||
case 200:
|
||||
return response_json
|
||||
case _:
|
||||
raise exceptions.YandexDiskAPIException(response.status_code, response_json.get("description", ""))
|
||||
|
||||
async def update_info(token: str, path: str, body: dict, fields: str = "", timeout: int = 30):
|
||||
async def update_info(token: str, path: str, body: dict, fields: str = "", timeout: int = 30) -> httpx.Response:
|
||||
"""
|
||||
Update information about a file or directory on the disk.
|
||||
|
||||
Parameters:
|
||||
- token (str): The authentication token for the disk.
|
||||
- path (str): The path of the file or directory to update.
|
||||
- body (dict): The new information to be updated.
|
||||
- fields (str, optional): The fields to be included in the response. Defaults to "".
|
||||
- timeout (int, optional): The timeout for the request in seconds. Defaults to 30.
|
||||
|
||||
Returns:
|
||||
- httpx.Response: The response from the server after the update operation.
|
||||
"""
|
||||
url = BASE_URL
|
||||
|
||||
async def mkdir(token: str, path: str, fields: str = "", timeout: int = 30):
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.patch(
|
||||
url=url,
|
||||
headers=utils.generate_headers(token=token),
|
||||
params={
|
||||
"path": utils.parse_path(path),
|
||||
"body": body,
|
||||
"fields": fields,
|
||||
},
|
||||
timeout=timeout
|
||||
)
|
||||
|
||||
return response
|
||||
|
||||
|
||||
async def mkdir(token: str, path: str, fields: str = "", timeout: int = 30) -> httpx.Response:
|
||||
"""
|
||||
Create a new directory on the disk.
|
||||
|
||||
Parameters:
|
||||
- token (str): The authentication token for the disk.
|
||||
- path (str): The path of the new directory to be created.
|
||||
- fields (str, optional): The fields to be included in the response. Defaults to "".
|
||||
- timeout (int, optional): The timeout for the request in seconds. Defaults to 30.
|
||||
|
||||
Returns:
|
||||
- httpx.Response: The response from the server after the creation operation.
|
||||
"""
|
||||
url = BASE_URL
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.put(
|
||||
url=url,
|
||||
headers=utils.generate_headers(token=token),
|
||||
params={
|
||||
"path": utils.parse_path(path),
|
||||
"fields": fields,
|
||||
},
|
||||
timeout=timeout
|
||||
)
|
||||
|
||||
return response
|
||||
|
||||
|
||||
async def copy(token: str, from_path: str, to_path: str, fields: str = "", force_async: bool = False,
|
||||
overwrite: bool = False, timeout: int = 30):
|
||||
overwrite: bool = False, timeout: int = 30) -> httpx.Response:
|
||||
"""
|
||||
Copy a file or directory from one location to another on the disk.
|
||||
|
||||
Parameters:
|
||||
- token (str): The authentication token for the disk.
|
||||
- from_path (str): The path of the file or directory to be copied.
|
||||
- to_path (str): The path where the file or directory should be copied to.
|
||||
- fields (str, optional): The fields to be included in the response. Defaults to "".
|
||||
- force_async (bool, optional): Whether to force asynchronous copying. Defaults to False.
|
||||
- overwrite (bool, optional): Whether to overwrite the destination file or directory if it already exists. Defaults to False.
|
||||
- timeout (int, optional): The timeout for the request in seconds. Defaults to 30.
|
||||
|
||||
Returns:
|
||||
- httpx.Response: The response from the server after the copy operation.
|
||||
"""
|
||||
url = BASE_URL + "/copy"
|
||||
|
||||
async def get_url(token: str, path: str, fields: str = "", timeout: int = 30):
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.post(
|
||||
url=url,
|
||||
headers=utils.generate_headers(token=token),
|
||||
params={
|
||||
"from": utils.parse_path(from_path),
|
||||
"path": utils.parse_path(to_path),
|
||||
"fields": fields,
|
||||
"force_async": force_async,
|
||||
"overwrite": overwrite,
|
||||
},
|
||||
timeout=timeout
|
||||
)
|
||||
|
||||
return response
|
||||
|
||||
|
||||
async def get_url(token: str, path: str, fields: str = "", timeout: int = 30) -> httpx.Response:
|
||||
"""
|
||||
Get the download URL for a file or directory on the disk.
|
||||
|
||||
Parameters:
|
||||
- token (str): The authentication token for the disk.
|
||||
- path (str): The path of the file or directory to get the download URL for.
|
||||
- fields (str, optional): The fields to be included in the response. Defaults to "".
|
||||
- timeout (int, optional): The timeout for the request in seconds. Defaults to 30.
|
||||
|
||||
Returns:
|
||||
- httpx.Response: The response from the server containing the download URL for the file or directory.
|
||||
"""
|
||||
url = BASE_URL + "/download"
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.get(
|
||||
url=url,
|
||||
headers=utils.generate_headers(token=token),
|
||||
params={
|
||||
"path": utils.parse_path(path),
|
||||
"fields": fields,
|
||||
},
|
||||
timeout=timeout
|
||||
)
|
||||
|
||||
return response
|
||||
|
||||
|
||||
async def get_all_files(token: str, fields: str = "", media_type: str = "", preview_size: str = "", sort: str = "",
|
||||
preview_crop: bool = False, limit: int = 100, offset: int = 0, timeout: int = 30):
|
||||
preview_crop: bool = False, limit: int = 100, offset: int = 0, timeout: int = 30) -> httpx.Response:
|
||||
"""
|
||||
Get a list of all files and directories on the disk.
|
||||
|
||||
Parameters:
|
||||
- token (str): The authentication token for the disk.
|
||||
- fields (str, optional): The fields to be included in the response. Defaults to "".
|
||||
- media_type (str, optional): The media type of the files to be included in the response. Defaults to "".
|
||||
- preview_size (str, optional): The size of the preview to be included in the response. Defaults to "".
|
||||
- sort (str, optional): The sorting order of the response. Defaults to "".
|
||||
- preview_crop (bool, optional): Whether to crop the preview. Defaults to False.
|
||||
- limit (int, optional): The maximum number of items to return in the response. Defaults to 100.
|
||||
- offset (int, optional): The number of items to skip before returning the response. Defaults to 0.
|
||||
- timeout (int, optional): The timeout for the request in seconds. Defaults to 30.
|
||||
|
||||
Returns:
|
||||
- httpx.Response: The response from the server containing a list of all files and directories.
|
||||
"""
|
||||
url = BASE_URL + "/files"
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.get(
|
||||
url=url,
|
||||
headers=utils.generate_headers(token=token),
|
||||
params={
|
||||
"fields": fields,
|
||||
"media_type": media_type,
|
||||
"preview_size": preview_size,
|
||||
"sort": sort,
|
||||
"preview_crop": preview_crop,
|
||||
"limit": limit,
|
||||
"offset": offset,
|
||||
},
|
||||
timeout=timeout
|
||||
)
|
||||
|
||||
return response
|
||||
|
||||
|
||||
async def get_last_uploads(token: str, fields: str = "", media_type: str = "", preview_size: str = "",
|
||||
preview_crop: bool = False, limit: int = 100, timeout: int = 30):
|
||||
preview_crop: bool = False, limit: int = 100, timeout: int = 30) -> httpx.Response:
|
||||
"""
|
||||
Get a list of the last uploaded files and directories on the disk.
|
||||
|
||||
Parameters:
|
||||
- token (str): The authentication token for the disk.
|
||||
- fields (str, optional): The fields to be included in the response. Defaults to "".
|
||||
- media_type (str, optional): The media type of the files to be included in the response. Defaults to "".
|
||||
- preview_size (str, optional): The size of the preview to be included in the response. Defaults to "".
|
||||
- preview_crop (bool, optional): Whether to crop the preview. Defaults to False.
|
||||
- limit (int, optional): The maximum number of items to return in the response. Defaults to 100.
|
||||
- timeout (int, optional): The timeout for the request in seconds. Defaults to 30.
|
||||
|
||||
Returns:
|
||||
- httpx.Response: The response from the server containing a list of the last uploaded files and directories.
|
||||
"""
|
||||
url = BASE_URL + "/last-uploaded"
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.get(
|
||||
url=url,
|
||||
headers=utils.generate_headers(token=token),
|
||||
params={
|
||||
"fields": fields,
|
||||
"media_type": media_type,
|
||||
"preview_size": preview_size,
|
||||
"preview_crop": preview_crop,
|
||||
"limit": limit,
|
||||
},
|
||||
timeout=timeout
|
||||
)
|
||||
|
||||
return response
|
||||
|
||||
|
||||
|
||||
|
||||
async def move(token: str, from_path: str, to_path: str, fields: str = "", force_async: bool = False,
|
||||
overwrite: bool = False, timeout: int = 30):
|
||||
overwrite: bool = False, timeout: int = 30) -> httpx.Response:
|
||||
"""
|
||||
Move a file or directory from one location to another on the disk.
|
||||
|
||||
Parameters:
|
||||
- token (str): The authentication token for the disk.
|
||||
- from_path (str): The path of the file or directory to be moved.
|
||||
- to_path (str): The path where the file or directory should be moved to.
|
||||
- fields (str, optional): The fields to be included in the response. Defaults to "".
|
||||
- force_async (bool, optional): Whether to force asynchronous moving. Defaults to False.
|
||||
- overwrite (bool, optional): Whether to overwrite the destination file or directory if it already exists. Defaults to False.
|
||||
- timeout (int, optional): The timeout for the request in seconds. Defaults to 30.
|
||||
|
||||
Returns:
|
||||
- httpx.Response: The response from the server after the move operation.
|
||||
"""
|
||||
url = BASE_URL + "/move"
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.post(
|
||||
url=url,
|
||||
headers=utils.generate_headers(token=token),
|
||||
params={
|
||||
"from": utils.parse_path(from_path),
|
||||
"path": utils.parse_path(to_path),
|
||||
"fields": fields,
|
||||
"force_async": force_async,
|
||||
"overwrite": overwrite,
|
||||
},
|
||||
timeout=timeout
|
||||
)
|
||||
|
||||
return response
|
||||
|
||||
|
||||
|
||||
|
||||
async def get_all_public(token: str, fields: str = "", preview_size: str = "", type_filter: str = "",
|
||||
preview_crop: bool = False, limit: int = 100, offset: int = 0, timeout: int = 30):
|
||||
preview_crop: bool = False, limit: int = 100, offset: int = 0, timeout: int = 30) -> httpx.Response:
|
||||
"""
|
||||
Get a list of all public files and directories on the disk.
|
||||
|
||||
Parameters:
|
||||
- token (str): The authentication token for the disk.
|
||||
- fields (str, optional): The fields to be included in the response. Defaults to "".
|
||||
- preview_size (str, optional): The size of the preview to be included in the response. Defaults to "".
|
||||
- type_filter (str, optional): The type of files to be included in the response. Defaults to "".
|
||||
- preview_crop (bool, optional): Whether to crop the preview. Defaults to False.
|
||||
- limit (int, optional): The maximum number of items to return in the response. Defaults to 100.
|
||||
- offset (int, optional): The number of items to skip before returning the response. Defaults to 0.
|
||||
- timeout (int, optional): The timeout for the request in seconds. Defaults to 30.
|
||||
|
||||
Returns:
|
||||
- httpx.Response: The response from the server containing a list of all public files and directories.
|
||||
"""
|
||||
url = BASE_URL + "/public"
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.get(
|
||||
url=url,
|
||||
headers=utils.generate_headers(token=token),
|
||||
params={
|
||||
"fields": fields,
|
||||
"preview_size": preview_size,
|
||||
"type_filter": type_filter,
|
||||
"preview_crop": preview_crop,
|
||||
"limit": limit,
|
||||
"offset": offset,
|
||||
},
|
||||
timeout=timeout
|
||||
)
|
||||
|
||||
return response
|
||||
|
||||
|
||||
|
||||
|
||||
async def publish(token: str, path: str, body: dict, fields: str = "", allow_address_access: bool = False,
|
||||
timeout: int = 30):
|
||||
timeout: int = 30) -> httpx.Response:
|
||||
"""
|
||||
Publish a file or directory on the disk.
|
||||
|
||||
Parameters:
|
||||
- token (str): The authentication token for the disk.
|
||||
- path (str): The path of the file or directory to be published.
|
||||
- body (dict): The information to be published.
|
||||
- fields (str, optional): The fields to be included in the response. Defaults to "".
|
||||
- allow_address_access (bool, optional): Whether to allow address access. Defaults to False.
|
||||
- timeout (int, optional): The timeout for the request in seconds. Defaults to 30.
|
||||
|
||||
Returns:
|
||||
- httpx.Response: The response from the server after the publish operation.
|
||||
"""
|
||||
url = BASE_URL + "/publish"
|
||||
|
||||
async def unpublish(token: str, path: str, fields: str = "", timeout: int = 30):
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.put(
|
||||
url=url,
|
||||
headers=utils.generate_headers(token=token),
|
||||
params={
|
||||
"path": utils.parse_path(path),
|
||||
"body": body,
|
||||
"fields": fields,
|
||||
"allow_address_access": allow_address_access,
|
||||
},
|
||||
timeout=timeout
|
||||
)
|
||||
|
||||
return response
|
||||
|
||||
|
||||
|
||||
|
||||
async def unpublish(token: str, path: str, fields: str = "", timeout: int = 30) -> httpx.Response:
|
||||
"""
|
||||
Unpublish a file or directory on the disk.
|
||||
|
||||
Parameters:
|
||||
- token (str): The authentication token for the disk.
|
||||
- path (str): The path of the file or directory to be unpublished.
|
||||
- fields (str, optional): The fields to be included in the response. Defaults to "".
|
||||
- timeout (int, optional): The timeout for the request in seconds. Defaults to 30.
|
||||
|
||||
Returns:
|
||||
- httpx.Response: The response from the server after the unpublish operation.
|
||||
"""
|
||||
url = BASE_URL + "/unpublish"
|
||||
|
||||
async def get_upload_url(token: str, path: str, fields: str = "", overwrite: bool = False, timeout: int = 30):
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.put(
|
||||
url=url,
|
||||
headers=utils.generate_headers(token=token),
|
||||
params={
|
||||
"path": utils.parse_path(path),
|
||||
"fields": fields,
|
||||
},
|
||||
timeout=timeout
|
||||
)
|
||||
|
||||
return response
|
||||
|
||||
|
||||
|
||||
|
||||
async def get_upload_url(token: str, path: str, fields: str = "", overwrite: bool = False, timeout: int = 30) -> httpx.Response:
|
||||
"""
|
||||
Get the upload URL for a file or directory on the disk.
|
||||
|
||||
Parameters:
|
||||
- token (str): The authentication token for the disk.
|
||||
- path (str): The path of the file or directory to get the upload URL for.
|
||||
- fields (str, optional): The fields to be included in the response. Defaults to "".
|
||||
- overwrite (bool, optional): Whether to overwrite the file or directory if it already exists. Defaults to False.
|
||||
- timeout (int, optional): The timeout for the request in seconds. Defaults to 30.
|
||||
|
||||
Returns:
|
||||
- httpx.Response: The response from the server containing the upload URL for the file or directory.
|
||||
"""
|
||||
url = BASE_URL + "/upload"
|
||||
|
||||
async def upload(token: str, upload_url: str, fields: str = "", disable_redirects: bool = False, timeout: int = 30):
|
||||
url = BASE_URL + "/upload"
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.get(
|
||||
url=url,
|
||||
headers=utils.generate_headers(token=token),
|
||||
params={
|
||||
"path": utils.parse_path(path),
|
||||
"fields": fields,
|
||||
"overwrite": overwrite,
|
||||
},
|
||||
timeout=timeout
|
||||
)
|
||||
|
||||
return response
|
||||
|
||||
|
||||
|
||||
|
||||
async def upload(token: str, path: str, upload_url: str, fields: str = "", disable_redirects: bool = False, timeout: int = 30) -> httpx.Response:
|
||||
"""
|
||||
Upload a file or directory to the disk.
|
||||
|
||||
Parameters:
|
||||
- token (str): The authentication token for the disk.
|
||||
- path (str): The path where the file or directory should be uploaded.
|
||||
- upload_url (str): The URL to upload the file or directory to.
|
||||
- fields (str, optional): The fields to be included in the response. Defaults to "".
|
||||
- disable_redirects (bool, optional): Whether to disable redirects. Defaults to False.
|
||||
- timeout (int, optional): The timeout for the request in seconds. Defaults to 30.
|
||||
|
||||
Returns:
|
||||
- httpx.Response: The response from the server after the upload operation.
|
||||
"""
|
||||
url = BASE_URL + "/upload"
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.post(
|
||||
url=url,
|
||||
headers=utils.generate_headers(token=token),
|
||||
params={
|
||||
"path": utils.parse_path(path),
|
||||
"url": upload_url,
|
||||
"fields": fields,
|
||||
"disable_redirects": disable_redirects,
|
||||
},
|
||||
timeout=timeout
|
||||
)
|
||||
|
||||
return response
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,114 @@
|
||||
import httpx
|
||||
import yndx_disk.api.utils as utils
|
||||
|
||||
|
||||
BASE_URL = "https://cloud-api.yandex.net/v1/disk/trash/resources"
|
||||
|
||||
|
||||
async def delete(token: str, fields: str = "", path: str = "", force_async: bool = False, timeout: int = 30) -> httpx.Response:
|
||||
"""
|
||||
Empty the trash on the server.
|
||||
|
||||
Parameters:
|
||||
- token (str): The authentication token for the server.
|
||||
- fields (str, optional): The fields to be included in the response. Defaults to "".
|
||||
- path (str, optional): The path of the trash to be emptied. Defaults to "".
|
||||
- force_async (bool, optional): Whether to force asynchronous emptying. Defaults to False.
|
||||
- timeout (int, optional): The timeout for the request in seconds. Defaults to 30.
|
||||
|
||||
Returns:
|
||||
- httpx.Response: The response from the server after the emptying operation.
|
||||
"""
|
||||
url = BASE_URL
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.delete(
|
||||
url=url,
|
||||
headers=utils.generate_headers(token=token),
|
||||
params={
|
||||
"fields": fields,
|
||||
"path": "" if not path else utils.parse_path(path, "trash:/"),
|
||||
"force_async": force_async,
|
||||
},
|
||||
timeout=timeout
|
||||
)
|
||||
|
||||
return response
|
||||
|
||||
|
||||
|
||||
async def get_info(token: str, path: str, fields: str = "", preview_size: str = "", sort: str = "",
|
||||
preview_crop: bool = False, limit: int = 100, offset: int = 0, timeout: int = 30) -> httpx.Response:
|
||||
"""
|
||||
Get the content of the trash on the server.
|
||||
|
||||
Parameters:
|
||||
- token (str): The authentication token for the server.
|
||||
- path (str): The path of the trash to get the content from.
|
||||
- fields (str, optional): The fields to be included in the response. Defaults to "".
|
||||
- preview_size (str, optional): The size of the preview to be included in the response. Defaults to "".
|
||||
- sort (str, optional): The sorting order of the response. Defaults to "".
|
||||
- preview_crop (bool, optional): Whether to crop the preview. Defaults to False.
|
||||
- limit (int, optional): The maximum number of items to return in the response. Defaults to 100.
|
||||
- offset (int, optional): The number of items to skip before returning the response. Defaults to 0.
|
||||
- timeout (int, optional): The timeout for the request in seconds. Defaults to 30.
|
||||
|
||||
Returns:
|
||||
- httpx.Response: The response from the server containing the content of the trash.
|
||||
"""
|
||||
url = BASE_URL
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.get(
|
||||
url=url,
|
||||
headers=utils.generate_headers(token=token),
|
||||
params={
|
||||
"path": "" if not path else utils.parse_path(path, "trash:/"),
|
||||
"fields": fields,
|
||||
"preview_size": preview_size,
|
||||
"sort": sort,
|
||||
"preview_crop": preview_crop,
|
||||
"limit": limit,
|
||||
"offset": offset,
|
||||
},
|
||||
timeout=timeout
|
||||
)
|
||||
|
||||
return response
|
||||
|
||||
|
||||
async def restore(token: str, path: str, fields: str = "", name: str = "", force_async: bool = False,
|
||||
overwrite: bool = False, timeout: int = 30) -> httpx.Response:
|
||||
"""
|
||||
Restore a file or directory from the trash on the server.
|
||||
|
||||
Parameters:
|
||||
- token (str): The authentication token for the server.
|
||||
- path (str): The path of the file or directory to be restored.
|
||||
- fields (str, optional): The fields to be included in the response. Defaults to "".
|
||||
- name (str, optional): The name of the file or directory to be restored. Defaults to "".
|
||||
- force_async (bool, optional): Whether to force asynchronous restoring. Defaults to False.
|
||||
- overwrite (bool, optional): Whether to overwrite the destination file or directory if it already exists. Defaults to False.
|
||||
- timeout (int, optional): The timeout for the request in seconds. Defaults to 30.
|
||||
|
||||
Returns:
|
||||
- httpx.Response: The response from the server after the restore operation.
|
||||
"""
|
||||
url = BASE_URL + "/restore"
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.put(
|
||||
url=url,
|
||||
headers=utils.generate_headers(token=token),
|
||||
params={
|
||||
"path": "" if not path else utils.parse_path(path, "trash:/"),
|
||||
"fields": fields,
|
||||
"name": name,
|
||||
"force_async": force_async,
|
||||
"overwrite": overwrite,
|
||||
},
|
||||
timeout=timeout
|
||||
)
|
||||
|
||||
return response
|
||||
|
||||
|
||||
@@ -16,14 +16,14 @@ def generate_headers(token: str) -> dict:
|
||||
return headers
|
||||
|
||||
|
||||
def parse_path(path: str) -> str:
|
||||
def parse_path(path: str, prefix: str = "disk:/") -> str:
|
||||
path = str(Path(path)) # Some kind of check is path valid or not =P
|
||||
|
||||
if path.startswith("/"):
|
||||
path = "disk:/" + path[1:]
|
||||
elif not path.startswith("disk:/"):
|
||||
path = "disk:/" + path
|
||||
path = prefix + path[1:]
|
||||
elif not path.startswith(prefix):
|
||||
path = prefix + path
|
||||
|
||||
path = Path(path) # Some kind of check is path valid or not =P
|
||||
|
||||
return str(path)
|
||||
return path
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,45 @@
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
|
||||
@dataclass
|
||||
class BaseObject:
|
||||
token: str
|
||||
|
||||
created_at: str
|
||||
modified_at: str
|
||||
|
||||
name: str
|
||||
path: str
|
||||
|
||||
resource_id: str
|
||||
|
||||
revision: int
|
||||
|
||||
public_key: str = ""
|
||||
public_url: str = ""
|
||||
|
||||
in_trash: bool = False
|
||||
|
||||
|
||||
@dataclass
|
||||
class File(BaseObject):
|
||||
antivirus_status: str = ""
|
||||
file_url: str = ""
|
||||
preview_url: str = ""
|
||||
md5: str = ""
|
||||
sha256: str = ""
|
||||
media_type: str = ""
|
||||
mime_type: str = ""
|
||||
|
||||
size: int = 0
|
||||
|
||||
|
||||
@dataclass
|
||||
class Directory(BaseObject):
|
||||
@property
|
||||
async def size(self) -> int:
|
||||
return 0
|
||||
|
||||
@property
|
||||
async def contents(self) -> list:
|
||||
return []
|
||||
@@ -0,0 +1,2 @@
|
||||
from yndx_disk.clients.async_client import AsyncDiskClient
|
||||
from yndx_disk.clients.sync_client import *
|
||||
@@ -0,0 +1,642 @@
|
||||
import httpx
|
||||
|
||||
import yndx_disk.api.disk as api_disk
|
||||
import yndx_disk.api.resources as api_resources
|
||||
import yndx_disk.api.operations as api_operations
|
||||
import yndx_disk.api.trash_resources as api_trash_resources
|
||||
import yndx_disk.api.public_resources as api_public_resources
|
||||
import yndx_disk.api.exceptions as api_exceptions
|
||||
from yndx_disk.api.utils import parse_path
|
||||
|
||||
from yndx_disk.classes import File, Directory
|
||||
|
||||
import asyncio
|
||||
import aiofiles
|
||||
import os
|
||||
|
||||
from pprint import pprint
|
||||
|
||||
|
||||
class AsyncDiskClient:
|
||||
"""
|
||||
A class representing an asynchronous client for interacting with a disk service.
|
||||
|
||||
Attributes:
|
||||
- user (dict): Information about the user.
|
||||
- system_folders (dict): Information about the system folders.
|
||||
- is_paid (bool): Whether the user has a paid account.
|
||||
- payment_flow (bool): Whether the user is in the payment flow.
|
||||
- unlimited_autoupload_enabled (bool): Whether unlimited autoupload is enabled.
|
||||
- reg_time (str): The registration time of the user.
|
||||
- total_space (int): The total disk space available.
|
||||
- used_space (int): The used disk space.
|
||||
- max_file_size (int): The maximum file size allowed.
|
||||
- paid_max_file_size (int): The maximum file size allowed for a paid account.
|
||||
- photounlim_size (int): The photo unlimited size.
|
||||
- trash_size (int): The size of the trash.
|
||||
- revision (int): The revision number.
|
||||
"""
|
||||
user: dict = None
|
||||
system_folders: dict = None
|
||||
|
||||
is_paid: bool = None
|
||||
payment_flow: bool = None
|
||||
unlimited_autoupload_enabled: bool = None
|
||||
|
||||
reg_time: str = None
|
||||
|
||||
total_space: int = None
|
||||
used_space: int = None
|
||||
|
||||
max_file_size: int = None
|
||||
paid_max_file_size: int = None
|
||||
photounlim_size: int = None
|
||||
trash_size: int = None
|
||||
|
||||
revision: int = None
|
||||
|
||||
def __init__(self, token: str, auto_update_info: bool = True):
|
||||
"""
|
||||
Initialize an instance of the AsyncDiskClient class.
|
||||
|
||||
Parameters:
|
||||
- token (str): The authentication token for the server.
|
||||
- auto_update_info (bool, optional): Whether to automatically update the client's information. Defaults to True.
|
||||
|
||||
Returns:
|
||||
- None
|
||||
"""
|
||||
self.token = token
|
||||
self.auto_update_info = auto_update_info
|
||||
|
||||
|
||||
async def _wait_for_operation_to_finish(self, operation_id: str) -> bool:
|
||||
"""
|
||||
Wait for an operation to finish.
|
||||
|
||||
This method continuously checks the status of an operation until it is no longer in progress. If the operation fails, the method returns False. Otherwise, it returns True.
|
||||
|
||||
Parameters:
|
||||
- operation_id (str): The ID of the operation to wait for.
|
||||
|
||||
Returns:
|
||||
- bool: True if the operation is successful, False otherwise.
|
||||
"""
|
||||
operation_status_response = await api_operations.get_operation_status(self.token, operation_id)
|
||||
operation_status_response_json = operation_status_response.json()
|
||||
operation_status = False if operation_status_response_json.get("status", "") == "in-progress" else True
|
||||
|
||||
while not operation_status:
|
||||
operation_status_response = await api_operations.get_operation_status(self.token, operation_id)
|
||||
operation_status_response_json = operation_status_response.json()
|
||||
operation_status = False if operation_status_response_json.get("status", "") == "in-progress" else True
|
||||
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
if operation_status_response_json.get("status", "") == "failed":
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
async def update_disk_info(self) -> None:
|
||||
"""
|
||||
Update the disk information for the client.
|
||||
|
||||
This method retrieves the disk information from the server using the provided token. If the request is successful (status code 200), it updates the client's information with the retrieved data. If the request fails, it raises a YandexDiskAPIException with the status code and description from the response.
|
||||
|
||||
Returns:
|
||||
- None
|
||||
"""
|
||||
response = await api_disk.get_disk_info(self.token)
|
||||
response_json = response.json()
|
||||
|
||||
if response.status_code != 200:
|
||||
raise api_exceptions.YandexDiskAPIException(response.status_code, response_json.get("description", ""))
|
||||
|
||||
self.user = response_json.get("user", {})
|
||||
self.system_folders = response_json.get("system_folders", {})
|
||||
|
||||
self.is_paid = response_json.get("is_paid", False)
|
||||
self.payment_flow = response_json.get("payment flow", False)
|
||||
self.unlimited_autoupload_enabled = response_json.get("unlimited_autoupload_enabled", False)
|
||||
|
||||
self.reg_time = response_json.get("reg_time", "")
|
||||
|
||||
self.total_space = response_json.get("total_space", 0)
|
||||
self.used_space = response_json.get("used_space", 0)
|
||||
|
||||
self.max_file_size = response_json.get("max_file_size", 0)
|
||||
self.paid_max_file_size = response_json.get("max_file_size", 0)
|
||||
self.photounlim_size = response_json.get("photounlim_size", 0)
|
||||
self.trash_size = response_json.get("trash_size", 0)
|
||||
|
||||
self.revision = response_json.get("revision", 0)
|
||||
|
||||
async def get_object(self, path: str) -> File | Directory:
|
||||
"""
|
||||
Get an object from the disk.
|
||||
|
||||
This method retrieves information about an object (file or directory) from the disk using the provided path. If the request is successful (status code 200), it returns a File or Directory object based on the type of the object. If the request fails, it raises a YandexDiskAPIException with the status code and description from the response.
|
||||
|
||||
Parameters:
|
||||
- path (str): The path of the object to retrieve.
|
||||
|
||||
Returns:
|
||||
- File | Directory: The retrieved File or Directory object.
|
||||
|
||||
Raises:
|
||||
- YandexDiskAPIException: If the request fails or if the object type cannot be determined.
|
||||
"""
|
||||
response = await api_resources.get_info(self.token, path, limit=0)
|
||||
|
||||
response_json = response.json()
|
||||
|
||||
if response.status_code != 200:
|
||||
raise api_exceptions.YandexDiskAPIException(response.status_code, response_json.get("description", ""))
|
||||
|
||||
object_type = response_json.get("type", "")
|
||||
|
||||
match object_type:
|
||||
case "file":
|
||||
return File(
|
||||
token=self.token,
|
||||
created_at=response_json.get("created", ""),
|
||||
modified_at=response_json.get("modified", ""),
|
||||
name=response_json.get("name", ""),
|
||||
path=response_json.get("path", ""),
|
||||
resource_id=response_json.get("resource_id", ""),
|
||||
revision=response_json.get("revision", 0),
|
||||
public_key=response_json.get("public_key", ""),
|
||||
public_url=response_json.get("public_url", ""),
|
||||
antivirus_status=response_json.get("antivirus_status", ""),
|
||||
file_url=response_json.get("file", ""),
|
||||
preview_url=response_json.get("preview", ""),
|
||||
md5=response_json.get("md5", ""),
|
||||
sha256=response_json.get("sha256", ""),
|
||||
media_type=response_json.get("media_type", ""),
|
||||
mime_type=response_json.get("mime_type", ""),
|
||||
size=response_json.get("size", 0),
|
||||
)
|
||||
case "dir":
|
||||
return Directory(
|
||||
token=self.token,
|
||||
created_at=response_json.get("created", ""),
|
||||
modified_at=response_json.get("modified", ""),
|
||||
name=response_json.get("name", ""),
|
||||
path=response_json.get("path", ""),
|
||||
resource_id=response_json.get("resource_id", ""),
|
||||
revision=response_json.get("revision", 0),
|
||||
public_key=response_json.get("public_key", ""),
|
||||
public_url=response_json.get("public_url", ""),
|
||||
)
|
||||
case _:
|
||||
raise api_exceptions.YandexDiskAPIException(f"Could not determine object type {path}")
|
||||
|
||||
async def listdir(self, path: str = "/", limit: int = 100, offset: int = 0) -> list[File | Directory]:
|
||||
"""
|
||||
List the contents of a directory on the disk.
|
||||
|
||||
This method retrieves the contents of a directory from the disk using the provided path. If the request is successful (status code 200), it returns a list of File or Directory objects representing the contents of the directory. If the request fails, it raises a YandexDiskAPIException with the status code and description from the response.
|
||||
|
||||
Parameters:
|
||||
- path (str, optional): The path of the directory to list. Defaults to "/".
|
||||
- limit (int, optional): The maximum number of items to return in the response. Defaults to 100.
|
||||
- offset (int, optional): The number of items to skip before returning the response. Defaults to 0.
|
||||
|
||||
Returns:
|
||||
- list[File | Directory]: A list of File or Directory objects representing the contents of the directory.
|
||||
|
||||
Raises:
|
||||
- YandexDiskAPIException: If the request fails or if the object type cannot be determined.
|
||||
"""
|
||||
response = await api_resources.get_info(self.token, path=path, limit=limit, offset=offset)
|
||||
|
||||
response_json = response.json()
|
||||
|
||||
if response.status_code != 200:
|
||||
raise api_exceptions.YandexDiskAPIException(response.status_code, response_json.get("description", ""))
|
||||
|
||||
embedded_items = response_json.get("_embedded", {}).get("items", [])
|
||||
directory_contents = []
|
||||
|
||||
for item in embedded_items:
|
||||
item_type = item.get("type", "")
|
||||
if not item_type:
|
||||
continue
|
||||
|
||||
match item_type:
|
||||
case "file":
|
||||
directory_contents.append(
|
||||
File(
|
||||
token=self.token,
|
||||
created_at=item.get("created", ""),
|
||||
modified_at=item.get("modified", ""),
|
||||
name=item.get("name", ""),
|
||||
path=item.get("path", ""),
|
||||
resource_id=item.get("resource_id", ""),
|
||||
revision=item.get("revision", 0),
|
||||
public_key=item.get("public_key", ""),
|
||||
public_url=item.get("public_url", ""),
|
||||
antivirus_status=item.get("antivirus_status", ""),
|
||||
file_url=item.get("file", ""),
|
||||
preview_url=item.get("preview", ""),
|
||||
md5=item.get("md5", ""),
|
||||
sha256=item.get("sha256", ""),
|
||||
media_type=item.get("media_type", ""),
|
||||
mime_type=item.get("mime_type", ""),
|
||||
size=item.get("size", 0),
|
||||
)
|
||||
)
|
||||
case "dir":
|
||||
directory_contents.append(
|
||||
Directory(
|
||||
token=self.token,
|
||||
created_at=item.get("created", ""),
|
||||
modified_at=item.get("modified", ""),
|
||||
name=item.get("name", ""),
|
||||
path=item.get("path", ""),
|
||||
resource_id=item.get("resource_id", ""),
|
||||
revision=item.get("revision", 0),
|
||||
public_key=item.get("public_key", ""),
|
||||
public_url=item.get("public_url", ""),
|
||||
)
|
||||
)
|
||||
case _:
|
||||
continue
|
||||
|
||||
return directory_contents
|
||||
|
||||
async def delete(self, path: str, permanently: bool = False) -> None:
|
||||
"""
|
||||
Delete a file or directory from the disk.
|
||||
|
||||
This method deletes a file or directory from the disk using the provided path. If the request is successful (status code 202), it waits for the operation to finish and raises a YandexDiskAPIException if the operation fails. If the request fails, it raises a YandexDiskAPIException with the status code and description from the response. If the auto_update_info attribute is True, it updates the disk information after the operation is successful.
|
||||
|
||||
Parameters:
|
||||
- path (str): The path of the file or directory to be deleted.
|
||||
- permanent (bool, optional): Whether to delete the file or directory permanently. Defaults to False.
|
||||
|
||||
Returns:
|
||||
- None
|
||||
|
||||
Raises:
|
||||
- YandexDiskAPIException: If the request fails or if the operation fails.
|
||||
"""
|
||||
response = await api_resources.delete(self.token, path=path, force_async=True, permanently=permanently)
|
||||
|
||||
response_json = response.json()
|
||||
|
||||
if response.status_code != 202:
|
||||
raise api_exceptions.YandexDiskAPIException(response.status_code, response_json.get("description", ""))
|
||||
|
||||
href = response_json.get("href", "")
|
||||
operation_id = href.split("/")[-1]
|
||||
operation_status = await self._wait_for_operation_to_finish(operation_id)
|
||||
|
||||
if not operation_status:
|
||||
raise api_exceptions.YandexDiskAPIException(f"Failed to delete {path}.")
|
||||
|
||||
if self.auto_update_info:
|
||||
await self.update_disk_info()
|
||||
|
||||
async def move(self, source_path: str, destination_path: str, overwrite: bool = False) -> None:
|
||||
"""
|
||||
Move a file or directory from one location to another on the disk.
|
||||
|
||||
This method moves a file or directory from the source path to the destination path. If the request is successful (status code 202), it waits for the operation to finish and raises a YandexDiskAPIException if the operation fails. If the request fails, it raises a YandexDiskAPIException with the status code and description from the response. If the auto_update_info attribute is True, it updates the disk information after the operation is successful.
|
||||
|
||||
Parameters:
|
||||
- source_path (str): The path of the file or directory to be moved.
|
||||
- destination_path (str): The path where the file or directory should be moved to.
|
||||
- overwrite (bool, optional): Whether to overwrite the destination file or directory if it already exists. Defaults to False.
|
||||
|
||||
Returns:
|
||||
- None
|
||||
|
||||
Raises:
|
||||
- YandexDiskAPIException: If the request fails or if the operation fails.
|
||||
"""
|
||||
response = await api_resources.move(self.token, source_path, destination_path, force_async=True, overwrite=overwrite)
|
||||
|
||||
response_json = response.json()
|
||||
|
||||
if response.status_code != 202:
|
||||
raise api_exceptions.YandexDiskAPIException(response.status_code, response_json.get("description", ""))
|
||||
|
||||
href = response_json.get("href", "")
|
||||
operation_id = href.split("/")[-1]
|
||||
operation_status = await self._wait_for_operation_to_finish(operation_id)
|
||||
|
||||
if not operation_status:
|
||||
raise api_exceptions.YandexDiskAPIException(f"Failed to move {source_path} to {destination_path}.")
|
||||
|
||||
async def copy(self, source_path: str, destination_path: str, overwrite: bool = False) -> None:
|
||||
"""
|
||||
Copy a file or directory from one location to another on the disk.
|
||||
|
||||
This method copies a file or directory from the source path to the destination path. If the request is successful (status code 202), it waits for the operation to finish and raises a YandexDiskAPIException if the operation fails. If the request fails, it raises a YandexDiskAPIException with the status code and description from the response. If the auto_update_info attribute is True, it updates the disk information after the operation is successful.
|
||||
|
||||
Parameters:
|
||||
- source_path (str): The path of the file or directory to be copied.
|
||||
- destination_path (str): The path where the file or directory should be copied to.
|
||||
- overwrite (bool, optional): Whether to overwrite the destination file or directory if it already exists. Defaults to False.
|
||||
|
||||
Returns:
|
||||
- None
|
||||
|
||||
Raises:
|
||||
- YandexDiskAPIException: If the request fails or if the operation fails.
|
||||
"""
|
||||
response = await api_resources.copy(self.token, source_path, destination_path, force_async=True,
|
||||
overwrite=overwrite)
|
||||
|
||||
response_json = response.json()
|
||||
|
||||
if response.status_code != 202:
|
||||
raise api_exceptions.YandexDiskAPIException(response.status_code, response_json.get("description", ""))
|
||||
|
||||
href = response_json.get("href", "")
|
||||
operation_id = href.split("/")[-1]
|
||||
operation_status = await self._wait_for_operation_to_finish(operation_id)
|
||||
|
||||
if not operation_status:
|
||||
raise api_exceptions.YandexDiskAPIException(f"Failed to copy {source_path} to {destination_path}.")
|
||||
|
||||
if self.auto_update_info:
|
||||
await self.update_disk_info()
|
||||
|
||||
async def publish(self, path: str, return_public_url: bool = False) -> str | None: # TODO: implement body
|
||||
"""
|
||||
Publish a file or directory on the server.
|
||||
|
||||
This method publishes a file or directory on the server using the provided path. If the request is successful (status code 200), it returns the public URL of the published file or directory. If the request fails, it raises a YandexDiskAPIException with the status code and description from the response.
|
||||
|
||||
Parameters:
|
||||
- path (str): The path of the file or directory to be published.
|
||||
- return_public_url (bool, optional): Whether to return the public URL of the published file or directory. Defaults to False.
|
||||
|
||||
Returns:
|
||||
- str | None: The public URL of the published file or directory if return_public_url is True, otherwise None.
|
||||
|
||||
Raises:
|
||||
- YandexDiskAPIException: If the request fails.
|
||||
"""
|
||||
|
||||
body = {
|
||||
"public_settings": {
|
||||
"read_only": False,
|
||||
"external_organization_id_verbose": {
|
||||
"enabled": False,
|
||||
"value": ""
|
||||
},
|
||||
"password_verbose": {
|
||||
"enabled": False,
|
||||
"value": ""
|
||||
},
|
||||
"available_until": False,
|
||||
"accesses": [
|
||||
{}
|
||||
],
|
||||
"available_until_verbose": {
|
||||
"enabled": False,
|
||||
"value": 0
|
||||
},
|
||||
"password": "",
|
||||
"external_organization_id": ""
|
||||
}
|
||||
}
|
||||
|
||||
response = await api_resources.publish(self.token, path, body)
|
||||
|
||||
response_json = response.json()
|
||||
|
||||
if response.status_code != 200:
|
||||
raise api_exceptions.YandexDiskAPIException(response.status_code, response_json.get("description", ""))
|
||||
|
||||
if return_public_url:
|
||||
obj: File | Directory = await self.get_object(path)
|
||||
return obj.public_url
|
||||
|
||||
async def unpublish(self, path: str):
|
||||
"""
|
||||
Unpublish a file or directory on the server.
|
||||
|
||||
This method unpublishes a file or directory on the server using the provided path. If the request is successful (status code 200), it returns the public URL of the unpublished file or directory. If the request fails, it raises a YandexDiskAPIException with the status code and description from the response.
|
||||
|
||||
Parameters:
|
||||
- path (str): The path of the file or directory to be unpublished.
|
||||
|
||||
Returns:
|
||||
- None
|
||||
|
||||
Raises:
|
||||
- YandexDiskAPIException: If the request fails.
|
||||
"""
|
||||
response = await api_resources.unpublish(self.token, path)
|
||||
|
||||
response_json = response.json()
|
||||
|
||||
if response.status_code != 200:
|
||||
raise api_exceptions.YandexDiskAPIException(response.status_code, response_json.get("description", ""))
|
||||
|
||||
async def upload_file(self, file_path: str, path: str, overwrite: bool = False, chunk_size: int = 1024) -> None:
|
||||
"""
|
||||
Upload a file to the server.
|
||||
|
||||
This method uploads a file to the server using the provided file path and destination path. If the file size is larger than the available space or the maximum file size allowed, it raises a YandexDiskAPIException. If the request is successful (status code 201), it updates the disk information if auto_update_info is True. If the request is successful (status code 202), it waits for the operation to finish and updates the disk information if auto_update_info is True. If the request fails, it raises a YandexDiskAPIException with the status code and description from the response.
|
||||
|
||||
Parameters:
|
||||
- file_path (str): The path of the file to be uploaded.
|
||||
- path (str): The destination path on the server.
|
||||
- overwrite (bool, optional): Whether to overwrite the destination file if it already exists. Defaults to False.
|
||||
- chunk_size (int, optional): The size of each chunk to be uploaded. Defaults to 1024.
|
||||
|
||||
Returns:
|
||||
- None
|
||||
|
||||
Raises:
|
||||
- YandexDiskAPIException: If the file size is too large or the request fails.
|
||||
"""
|
||||
file_path = os.path.abspath(file_path)
|
||||
|
||||
if not os.path.exists(file_path):
|
||||
raise ValueError(f"File {file_path} does not exist.")
|
||||
|
||||
file_size = os.path.getsize(file_path)
|
||||
if file_size > (self.total_space - self.used_space):
|
||||
raise api_exceptions.YandexDiskAPIException(f"You don't have enough space to upload {file_size}.")
|
||||
elif file_size > self.max_file_size:
|
||||
raise api_exceptions.YandexDiskAPIException(f"File {file_path} is too large.")
|
||||
|
||||
response = await api_resources.get_upload_url(self.token, path, overwrite=overwrite)
|
||||
|
||||
response_json = response.json()
|
||||
|
||||
if response.status_code != 200:
|
||||
raise api_exceptions.YandexDiskAPIException(response.status_code, response_json.get("description", ""))
|
||||
|
||||
operation_id = response_json.get("operation_id", "")
|
||||
upload_url = response_json.get("href", "")
|
||||
|
||||
async def chunked_file_reader(file_path_, chunk_size_):
|
||||
async with aiofiles.open(file_path_, "rb") as file_:
|
||||
while chunk := await file_.read(chunk_size_):
|
||||
yield chunk
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
async with client.stream("PUT", url=upload_url, data=chunked_file_reader(file_path, chunk_size)) as upload_response:
|
||||
await upload_response.aread()
|
||||
|
||||
match upload_response.status_code:
|
||||
case 201:
|
||||
if self.auto_update_info:
|
||||
await self.update_disk_info()
|
||||
case 202:
|
||||
await self._wait_for_operation_to_finish(operation_id)
|
||||
if self.auto_update_info:
|
||||
await self.update_disk_info()
|
||||
case _:
|
||||
raise api_exceptions.YandexDiskAPIException(upload_response.status_code, upload_response.text)
|
||||
|
||||
async def listdir_trash(self, path: str = "/", limit: int = 100, offset: int = 0) -> list[File | Directory]:
|
||||
"""
|
||||
List the contents of a directory in the trash on the disk.
|
||||
|
||||
This method retrieves the contents of a directory in the trash from the disk using the provided path. If the request is successful (status code 200), it returns a list of File or Directory objects representing the contents of the directory. If the request fails, it raises a YandexDiskAPIException with the status code and description from the response.
|
||||
|
||||
Parameters:
|
||||
- path (str, optional): The path of the directory in the trash to list. Defaults to "/".
|
||||
- limit (int, optional): The maximum number of items to return in the response. Defaults to 100.
|
||||
- offset (int, optional): The number of items to skip before returning the response. Defaults to 0.
|
||||
|
||||
Returns:
|
||||
- list[File | Directory]: A list of File or Directory objects representing the contents of the directory in the trash.
|
||||
|
||||
Raises:
|
||||
- YandexDiskAPIException: If the request fails or if the object type cannot be determined.
|
||||
"""
|
||||
response = await api_trash_resources.get_info(self.token, path=path, limit=limit, offset=offset)
|
||||
|
||||
response_json = response.json()
|
||||
|
||||
if response.status_code != 200:
|
||||
raise api_exceptions.YandexDiskAPIException(response.status_code, response_json.get("description", ""))
|
||||
|
||||
embedded_items = response_json.get("_embedded", {}).get("items", [])
|
||||
directory_contents = []
|
||||
|
||||
for item in embedded_items:
|
||||
item_type = item.get("type", "")
|
||||
if not item_type:
|
||||
continue
|
||||
|
||||
match item_type:
|
||||
case "file":
|
||||
directory_contents.append(
|
||||
File(
|
||||
token=self.token,
|
||||
created_at=item.get("created", ""),
|
||||
modified_at=item.get("modified", ""),
|
||||
name=item.get("name", ""),
|
||||
path=item.get("path", ""),
|
||||
resource_id=item.get("resource_id", ""),
|
||||
revision=item.get("revision", 0),
|
||||
public_key=item.get("public_key", ""),
|
||||
public_url=item.get("public_url", ""),
|
||||
antivirus_status=item.get("antivirus_status", ""),
|
||||
file_url=item.get("file", ""),
|
||||
preview_url=item.get("preview", ""),
|
||||
md5=item.get("md5", ""),
|
||||
sha256=item.get("sha256", ""),
|
||||
media_type=item.get("media_type", ""),
|
||||
mime_type=item.get("mime_type", ""),
|
||||
size=item.get("size", 0),
|
||||
in_trash=True
|
||||
)
|
||||
)
|
||||
case "dir":
|
||||
directory_contents.append(
|
||||
Directory(
|
||||
token=self.token,
|
||||
created_at=item.get("created", ""),
|
||||
modified_at=item.get("modified", ""),
|
||||
name=item.get("name", ""),
|
||||
path=item.get("path", ""),
|
||||
resource_id=item.get("resource_id", ""),
|
||||
revision=item.get("revision", 0),
|
||||
public_key=item.get("public_key", ""),
|
||||
public_url=item.get("public_url", ""),
|
||||
in_trash=True
|
||||
)
|
||||
)
|
||||
case _:
|
||||
continue
|
||||
|
||||
return directory_contents
|
||||
|
||||
async def delete_trash(self, path: str = ""):
|
||||
"""
|
||||
Delete a file or directory from the trash on the server.
|
||||
|
||||
This method deletes a file or directory from the trash using the provided path. If the request is successful (status code 202), it waits for the operation to finish and raises a YandexDiskAPIException if the operation fails. If the request fails, it raises a YandexDiskAPIException with the status code and description from the response. If the auto_update_info attribute is True, it updates the disk information after the operation is successful.
|
||||
|
||||
Parameters:
|
||||
- path (str, optional): The path of the file or directory to be deleted from the trash. Defaults to "".
|
||||
|
||||
Returns:
|
||||
- None
|
||||
|
||||
Raises:
|
||||
- YandexDiskAPIException: If the request fails or if the operation fails.
|
||||
"""
|
||||
response = await api_trash_resources.delete(self.token, path=path, force_async=True)
|
||||
|
||||
response_json = response.json()
|
||||
|
||||
if response.status_code != 202:
|
||||
raise api_exceptions.YandexDiskAPIException(response.status_code, response_json.get("description", ""))
|
||||
|
||||
href = response_json.get("href", "")
|
||||
operation_id = href.split("/")[-1]
|
||||
operation_status = await self._wait_for_operation_to_finish(operation_id)
|
||||
|
||||
if not operation_status:
|
||||
raise api_exceptions.YandexDiskAPIException(f"Failed to delete {path}.")
|
||||
|
||||
if self.auto_update_info:
|
||||
await self.update_disk_info()
|
||||
|
||||
async def restore_trash(self, path: str, new_name: str = "", overwrite: bool = False):
|
||||
"""
|
||||
Restore a file or directory from the trash on the server.
|
||||
|
||||
This method restores a file or directory from the trash using the provided path. If the request is successful (status code 202), it waits for the operation to finish and raises a YandexDiskAPIException if the operation fails. If the request fails, it raises a YandexDiskAPIException with the status code and description from the response. If the auto_update_info attribute is True, it updates the disk information after the operation is successful.
|
||||
|
||||
Parameters:
|
||||
- path (str): The path of the file or directory to be restored.
|
||||
- new_name (str, optional): The new name for the restored file or directory. Defaults to "".
|
||||
- overwrite (bool, optional): Whether to overwrite the destination file or directory if it already exists. Defaults to False.
|
||||
|
||||
Returns:
|
||||
- None
|
||||
|
||||
Raises:
|
||||
- YandexDiskAPIException: If the request fails or if the operation fails.
|
||||
"""
|
||||
response = await api_trash_resources.restore(self.token, path, name=new_name, overwrite=overwrite, force_async=True)
|
||||
|
||||
response_json = response.json()
|
||||
|
||||
if response.status_code != 202:
|
||||
raise api_exceptions.YandexDiskAPIException(response.status_code, response_json.get("description", ""))
|
||||
|
||||
href = response_json.get("href", "")
|
||||
operation_id = href.split("/")[-1]
|
||||
operation_status = await self._wait_for_operation_to_finish(operation_id)
|
||||
|
||||
if not operation_status:
|
||||
raise api_exceptions.YandexDiskAPIException(f"Failed to restore {path}.")
|
||||
|
||||
if self.auto_update_info:
|
||||
await self.update_disk_info()
|
||||
|
||||
@@ -0,0 +1,48 @@
|
||||
from yndx_disk.classes import File, Directory
|
||||
from yndx_disk.clients.async_client import AsyncDiskClient
|
||||
|
||||
import asyncio
|
||||
|
||||
|
||||
class DiskClient(AsyncDiskClient):
|
||||
def __init__(self, token: str, auto_update_info: bool = True):
|
||||
super().__init__(token, auto_update_info)
|
||||
|
||||
def update_disk_info(self) -> None:
|
||||
return asyncio.run(super().update_disk_info())
|
||||
|
||||
def get_object(self, path: str) -> File | Directory:
|
||||
return asyncio.run(super().get_object(path))
|
||||
|
||||
def listdir(self, path: str = "/", limit: int = 100, offset: int = 0) -> list[File | Directory]:
|
||||
return asyncio.run(super().listdir(path, limit, offset))
|
||||
|
||||
def delete(self, path: str = "", permanently: bool = False) -> None:
|
||||
return asyncio.run(super().delete(path, permanently))
|
||||
|
||||
def move(self, source_path: str, destination_path: str, overwrite: bool = False) -> None:
|
||||
return asyncio.run(super().move(source_path, destination_path, overwrite))
|
||||
|
||||
def copy(self, source_path: str, destination_path: str, overwrite: bool = False) -> None:
|
||||
return asyncio.run(super().copy(source_path, destination_path, overwrite))
|
||||
|
||||
def publish(self, path: str, return_public_url: bool = False) -> str | None:
|
||||
return asyncio.run(super().publish(path, return_public_url))
|
||||
|
||||
def unpublish(self, path: str):
|
||||
return asyncio.run(super().unpublish(path))
|
||||
|
||||
def upload_file(self, file_path: str, path: str, overwrite: bool = False, chunk_size: int = 1024) -> None:
|
||||
return asyncio.run(super().upload_file(file_path, path, overwrite, chunk_size))
|
||||
|
||||
def listdir_trash(self, path: str = "/", limit: int = 100, offset: int = 0) -> list[File | Directory]:
|
||||
return asyncio.run(super().listdir_trash(path, limit, offset))
|
||||
|
||||
def delete_trash(self, path: str = ""):
|
||||
return asyncio.run(super().delete_trash(path))
|
||||
|
||||
def restore_trash(self, path: str, new_name: str = "", overwrite: bool = False):
|
||||
return asyncio.run(super().restore_trash(path, new_name, overwrite))
|
||||
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user