Source code for datalad_next.url_operations.file

"""Handler for operations, such as "download", on file:// URLs"""

# allow for |-type UnionType declarations
from __future__ import annotations

import logging
import random
import stat
import sys
import time
from io import IOBase
from math import floor
from pathlib import Path
from typing import (
    BinaryIO,
    Dict,
)
from urllib import (
    request,
    parse,
)

from datalad_next.consts import COPY_BUFSIZE

from .base import UrlOperations
from .exceptions import (
    UrlOperationsRemoteError,
    UrlOperationsResourceUnknown,
)

lgr = logging.getLogger('datalad.ext.next.file_url_operations')


__all__ = ['FileUrlOperations']


[docs] class FileUrlOperations(UrlOperations): """Handler for operations on `file://` URLs Access to local data via file-scheme URLs is supported with the same API and feature set as other URL-schemes (simultaneous content hashing and progress reporting. """ def _file_url_to_path(self, url): assert url.startswith('file://') parsed = parse.urlparse(url) path = request.url2pathname(parsed.path) return Path(path)
[docs] def stat(self, url: str, *, credential: str | None = None, timeout: float | None = None) -> Dict: """Gather information on a URL target, without downloading it See :meth:`datalad_next.url_operations.UrlOperations.stat` for parameter documentation and exception behavior. Raises ------ UrlOperationsResourceUnknown For access targets found absent. """ # filter out internals return { k: v for k, v in self._stat(url, credential).items() if not k.startswith('_') }
def _stat(self, url: str, credential: str | None = None) -> Dict: # turn url into a native path from_path = self._file_url_to_path(url) # if anything went wrong with the conversion, or we lack # permissions: die here try: size = from_path.stat().st_size except FileNotFoundError as e: raise UrlOperationsResourceUnknown(url) from e return { 'content-length': size, '_path': from_path, }
[docs] def download(self, from_url: str, to_path: Path | None, *, # unused, but theoretically could be used to # obtain escalated/different privileges on a system # to gain file access credential: str | None = None, hash: list[str] | None = None, timeout: float | None = None) -> Dict: """Copy a file:// URL target to a local path See :meth:`datalad_next.url_operations.UrlOperations.download` for parameter documentation and exception behavior. Raises ------ UrlOperationsResourceUnknown For download targets found absent. """ dst_fp = None try: props = self._stat(from_url, credential=credential) from_path = props['_path'] expected_size = props['content-length'] dst_fp = sys.stdout.buffer if to_path is None \ else open(to_path, 'wb') with from_path.open('rb') as src_fp: props.update(self._copyfp( src_fp, dst_fp, expected_size, hash, start_log=('Download %s to %s', from_url, to_path), update_log=('Downloaded chunk',), finish_log=('Finished download',), progress_label='downloading', )) return props except PermissionError: # would be a local issue, pass-through raise except UrlOperationsResourceUnknown: # would come from stat(), pass_through raise except Exception as e: # wrap this into the datalad-standard, but keep the # original exception linked raise UrlOperationsRemoteError(from_url, message=str(e)) from e finally: if dst_fp and to_path is not None: dst_fp.close()
[docs] def upload(self, from_path: Path | None, to_url: str, *, credential: str | None = None, hash: list[str] | None = None, timeout: float | None = None) -> Dict: """Copy a local file to a file:// URL target Any missing parent directories of the URL target are created as necessary. See :meth:`datalad_next.url_operations.UrlOperations.upload` for parameter documentation and exception behavior. Raises ------ FileNotFoundError If the source file cannot be found. """ # get the size, or die if inaccessible props = {} if from_path: expected_size = from_path.stat().st_size props['content-length'] = expected_size else: expected_size = None final_path = self._file_url_to_path(to_url) # added a `nosec` below because we don't need a cryptographically # secure random number here. time_stamp = time.time() to_path = final_path.with_suffix( # nosec final_path.suffix + f'.transfer-{random.randint(1000000000, 9999999999)}' + f'_{str(time_stamp - floor(time_stamp))[2:]}' ) # create parent dir(s) as necessary to_path.parent.mkdir(exist_ok=True, parents=True) src_fp = None try: src_fp = sys.stdin.buffer if from_path is None \ else open(from_path, 'rb') with to_path.open('wb') as dst_fp: props.update(self._copyfp( src_fp, dst_fp, expected_size, hash, start_log=('Upload %s to %s', from_path, to_url), update_log=('Uploaded chunk',), finish_log=('Finished upload',), progress_label='uploading', )) to_path.replace(final_path) return props except FileNotFoundError as e: raise UrlOperationsResourceUnknown(from_path) from e except Exception as e: # wrap this into the datalad-standard, but keep the # original exception linked raise UrlOperationsRemoteError(to_url, message=str(e)) from e finally: if src_fp and from_path is not None: src_fp.close()
[docs] def delete(self, url: str, *, credential: str | None = None, timeout: float | None = None) -> Dict: """Delete the target of a file:// URL The target can be a file or a directory. `delete` will attempt to delete write protected targets (by setting write permissions). If the target is a directory, the complete directory and all its content will be deleted. `delete` will not modify the permissions of the parent of the target. That means, it will not delete a target in a write protected directory, but it will empty target, if target is a directory. See :meth:`datalad_next.url_operations.UrlOperations.delete` for parameter documentation and exception behavior. Raises ------ UrlOperationsResourceUnknown For deletion targets found absent. """ path = self._file_url_to_path(url) try: if path.is_dir(): self._delete_dir(path) else: path.chmod(stat.S_IWUSR) path.unlink() except FileNotFoundError as e: raise UrlOperationsResourceUnknown(url) from e except Exception as e: # wrap this into the datalad-standard, but keep the # original exception linked raise UrlOperationsRemoteError(url, message=str(e)) from e return {}
def _delete_dir(self, path: Path): path.chmod(stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR) for sub_path in path.iterdir(): if sub_path.is_dir(): self._delete_dir(sub_path) else: sub_path.chmod(stat.S_IWUSR) sub_path.unlink() path.rmdir() def _copyfp(self, src_fp: IOBase | BinaryIO, dst_fp: IOBase | BinaryIO, expected_size: int | None, hash: list[str] | None, start_log: tuple, update_log: tuple, finish_log: tuple, progress_label: str, ) -> dict: # this is pretty much shutil.copyfileobj() with the necessary # wrapping to perform hashing and progress reporting hasher = self._get_hasher(hash) progress_id = self._get_progress_id(str(id(src_fp)), str(id(dst_fp))) # Localize variable access to minimize overhead src_fp_read = src_fp.read dst_fp_write = dst_fp.write props = {} self._progress_report_start( progress_id, start_log, progress_label, expected_size) copy_size = 0 try: while True: chunk = src_fp_read(COPY_BUFSIZE) if not chunk: break dst_fp_write(chunk) chunk_size = len(chunk) self._progress_report_update( progress_id, update_log, chunk_size) # compute hash simultaneously hasher.update(chunk) copy_size += chunk_size props.update(hasher.get_hexdigest()) # return how much was copied. we could compare with # `expected_size` and error on mismatch, but not all # sources can provide that (e.g. stdin) props['content-length'] = copy_size return props finally: self._progress_report_stop(progress_id, finish_log)