Source code for datalad_next.url_operations.file

"""Handler for operations, such as "download", on file:// URLs"""

# allow for |-type UnionType declarations
from __future__ import annotations

import logging
from pathlib import Path
import sys
from typing import Dict
from urllib import (
    request,
    parse,
)

from datalad_next.consts import COPY_BUFSIZE

from .base import UrlOperations
from .exceptions import (
    UrlOperationsRemoteError,
    UrlOperationsResourceUnknown,
)

lgr = logging.getLogger('datalad.ext.next.file_url_operations')


__all__ = ['FileUrlOperations']


[docs] class FileUrlOperations(UrlOperations): """Handler for operations on `file://` URLs Access to local data via file-scheme URLs is supported with the same API and feature set as other URL-schemes (simultaneous content hashing and progress reporting. """ def _file_url_to_path(self, url): assert url.startswith('file://') parsed = parse.urlparse(url) path = request.url2pathname(parsed.path) return Path(path)
[docs] def stat(self, url: str, *, credential: str | None = None, timeout: float | None = None) -> Dict: """Gather information on a URL target, without downloading it See :meth:`datalad_next.url_operations.UrlOperations.stat` for parameter documentation and exception behavior. Raises ------ UrlOperationsResourceUnknown For access targets found absent. """ # filter out internals return { k: v for k, v in self._stat(url, credential).items() if not k.startswith('_') }
def _stat(self, url: str, credential: str | None = None) -> Dict: # turn url into a native path from_path = self._file_url_to_path(url) # if anything went wrong with the conversion, or we lack # permissions: die here try: size = from_path.stat().st_size except FileNotFoundError as e: raise UrlOperationsResourceUnknown(url) from e return { 'content-length': size, '_path': from_path, }
[docs] def download(self, from_url: str, to_path: Path | None, *, # unused, but theoretically could be used to # obtain escalated/different privileges on a system # to gain file access credential: str | None = None, hash: list[str] | None = None, timeout: float | None = None) -> Dict: """Copy a file:// URL target to a local path See :meth:`datalad_next.url_operations.UrlOperations.download` for parameter documentation and exception behavior. Raises ------ UrlOperationsResourceUnknown For download targets found absent. """ dst_fp = None try: props = self._stat(from_url, credential=credential) from_path = props['_path'] expected_size = props['content-length'] dst_fp = sys.stdout.buffer if to_path is None \ else open(to_path, 'wb') with from_path.open('rb') as src_fp: props.update(self._copyfp( src_fp, dst_fp, expected_size, hash, start_log=('Download %s to %s', from_url, to_path), update_log=('Downloaded chunk',), finish_log=('Finished download',), progress_label='downloading', )) return props except PermissionError: # would be a local issue, pass-through raise except UrlOperationsResourceUnknown: # would come from stat(), pass_through raise except Exception as e: # wrap this into the datalad-standard, but keep the # original exception linked raise UrlOperationsRemoteError(from_url, message=str(e)) from e finally: if dst_fp and to_path is not None: dst_fp.close()
[docs] def upload(self, from_path: Path | None, to_url: str, *, credential: str | None = None, hash: list[str] | None = None, timeout: float | None = None) -> Dict: """Copy a local file to a file:// URL target Any missing parent directories of the URL target are created as necessary. See :meth:`datalad_next.url_operations.UrlOperations.upload` for parameter documentation and exception behavior. Raises ------ FileNotFoundError If the source file cannot be found. """ # get the size, or die if inaccessible props = {} if from_path: expected_size = from_path.stat().st_size props['content-length'] = expected_size else: expected_size = None to_path = self._file_url_to_path(to_url) # create parent dir(s) as necessary to_path.parent.mkdir(exist_ok=True, parents=True) src_fp = None try: src_fp = sys.stdin.buffer if from_path is None \ else open(from_path, 'rb') with to_path.open('wb') as dst_fp: props.update(self._copyfp( src_fp, dst_fp, expected_size, hash, start_log=('Upload %s to %s', from_path, to_url), update_log=('Uploaded chunk',), finish_log=('Finished upload',), progress_label='uploading', )) return props except FileNotFoundError as e: raise UrlOperationsResourceUnknown(url) from e except Exception as e: # wrap this into the datalad-standard, but keep the # original exception linked raise UrlOperationsRemoteError(from_url, message=str(e)) from e finally: if src_fp and from_path is not None: src_fp.close()
[docs] def delete(self, url: str, *, credential: str | None = None, timeout: float | None = None) -> Dict: """Delete the target of a file:// URL The target can be a file or a directory. If it is a directory, it has to be empty. See :meth:`datalad_next.url_operations.UrlOperations.delete` for parameter documentation and exception behavior. Raises ------ UrlOperationsResourceUnknown For deletion targets found absent. """ path = self._file_url_to_path(url) try: path.unlink() except FileNotFoundError as e: raise UrlOperationsResourceUnknown(url) from e except IsADirectoryError: try: path.rmdir() except Exception as e: raise UrlOperationsRemoteError(url, message=str(e)) from e except Exception as e: # wrap this into the datalad-standard, but keep the # original exception linked raise UrlOperationsRemoteError(url, message=str(e)) from e
def _copyfp(self, src_fp: file, dst_fp: file, expected_size: int, hash: list[str] | None, start_log: tuple, update_log: tuple, finish_log: tuple, progress_label: str, ) -> dict: # this is pretty much shutil.copyfileobj() with the necessary # wrapping to perform hashing and progress reporting hasher = self._get_hasher(hash) progress_id = self._get_progress_id(id(src_fp), id(src_fp)) # Localize variable access to minimize overhead src_fp_read = src_fp.read dst_fp_write = dst_fp.write props = {} self._progress_report_start( progress_id, start_log, progress_label, expected_size) copy_size = 0 try: while True: chunk = src_fp_read(COPY_BUFSIZE) if not chunk: break dst_fp_write(chunk) chunk_size = len(chunk) self._progress_report_update( progress_id, update_log, chunk_size) # compute hash simultaneously hasher.update(chunk) copy_size += chunk_size props.update(hasher.get_hexdigest()) # return how much was copied. we could compare with # `expected_size` and error on mismatch, but not all # sources can provide that (e.g. stdin) props['content-length'] = copy_size return props finally: self._progress_report_stop(progress_id, finish_log)