"""Handler for operations, such as "download", on file:// URLs"""
# allow for |-type UnionType declarations
from __future__ import annotations
import logging
import random
import stat
import sys
import time
from io import IOBase
from math import floor
from pathlib import Path
from typing import (
BinaryIO,
Dict,
)
from urllib import (
request,
parse,
)
from datalad_next.consts import COPY_BUFSIZE
from .base import UrlOperations
from .exceptions import (
UrlOperationsRemoteError,
UrlOperationsResourceUnknown,
)
lgr = logging.getLogger('datalad.ext.next.file_url_operations')
__all__ = ['FileUrlOperations']
[docs]
class FileUrlOperations(UrlOperations):
"""Handler for operations on `file://` URLs
Access to local data via file-scheme URLs is supported with the
same API and feature set as other URL-schemes (simultaneous
content hashing and progress reporting.
"""
def _file_url_to_path(self, url):
assert url.startswith('file://')
parsed = parse.urlparse(url)
path = request.url2pathname(parsed.path)
return Path(path)
[docs]
def stat(self,
url: str,
*,
credential: str | None = None,
timeout: float | None = None) -> Dict:
"""Gather information on a URL target, without downloading it
See :meth:`datalad_next.url_operations.UrlOperations.stat`
for parameter documentation and exception behavior.
Raises
------
UrlOperationsResourceUnknown
For access targets found absent.
"""
# filter out internals
return {
k: v for k, v in self._stat(url, credential).items()
if not k.startswith('_')
}
def _stat(self, url: str, credential: str | None = None) -> Dict:
# turn url into a native path
from_path = self._file_url_to_path(url)
# if anything went wrong with the conversion, or we lack
# permissions: die here
try:
size = from_path.stat().st_size
except FileNotFoundError as e:
raise UrlOperationsResourceUnknown(url) from e
return {
'content-length': size,
'_path': from_path,
}
[docs]
def download(self,
from_url: str,
to_path: Path | None,
*,
# unused, but theoretically could be used to
# obtain escalated/different privileges on a system
# to gain file access
credential: str | None = None,
hash: list[str] | None = None,
timeout: float | None = None) -> Dict:
"""Copy a file:// URL target to a local path
See :meth:`datalad_next.url_operations.UrlOperations.download`
for parameter documentation and exception behavior.
Raises
------
UrlOperationsResourceUnknown
For download targets found absent.
"""
dst_fp = None
try:
props = self._stat(from_url, credential=credential)
from_path = props['_path']
expected_size = props['content-length']
dst_fp = sys.stdout.buffer if to_path is None \
else open(to_path, 'wb')
with from_path.open('rb') as src_fp:
props.update(self._copyfp(
src_fp,
dst_fp,
expected_size,
hash,
start_log=('Download %s to %s', from_url, to_path),
update_log=('Downloaded chunk',),
finish_log=('Finished download',),
progress_label='downloading',
))
return props
except PermissionError:
# would be a local issue, pass-through
raise
except UrlOperationsResourceUnknown:
# would come from stat(), pass_through
raise
except Exception as e:
# wrap this into the datalad-standard, but keep the
# original exception linked
raise UrlOperationsRemoteError(from_url, message=str(e)) from e
finally:
if dst_fp and to_path is not None:
dst_fp.close()
[docs]
def upload(self,
from_path: Path | None,
to_url: str,
*,
credential: str | None = None,
hash: list[str] | None = None,
timeout: float | None = None) -> Dict:
"""Copy a local file to a file:// URL target
Any missing parent directories of the URL target are created as
necessary.
See :meth:`datalad_next.url_operations.UrlOperations.upload`
for parameter documentation and exception behavior.
Raises
------
FileNotFoundError
If the source file cannot be found.
"""
# get the size, or die if inaccessible
props = {}
if from_path:
expected_size = from_path.stat().st_size
props['content-length'] = expected_size
else:
expected_size = None
final_path = self._file_url_to_path(to_url)
# added a `nosec` below because we don't need a cryptographically
# secure random number here.
time_stamp = time.time()
to_path = final_path.with_suffix( # nosec
final_path.suffix
+ f'.transfer-{random.randint(1000000000, 9999999999)}'
+ f'_{str(time_stamp - floor(time_stamp))[2:]}'
)
# create parent dir(s) as necessary
to_path.parent.mkdir(exist_ok=True, parents=True)
src_fp = None
try:
src_fp = sys.stdin.buffer if from_path is None \
else open(from_path, 'rb')
with to_path.open('wb') as dst_fp:
props.update(self._copyfp(
src_fp,
dst_fp,
expected_size,
hash,
start_log=('Upload %s to %s', from_path, to_url),
update_log=('Uploaded chunk',),
finish_log=('Finished upload',),
progress_label='uploading',
))
to_path.replace(final_path)
return props
except FileNotFoundError as e:
raise UrlOperationsResourceUnknown(from_path) from e
except Exception as e:
# wrap this into the datalad-standard, but keep the
# original exception linked
raise UrlOperationsRemoteError(to_url, message=str(e)) from e
finally:
if src_fp and from_path is not None:
src_fp.close()
[docs]
def delete(self,
url: str,
*,
credential: str | None = None,
timeout: float | None = None) -> Dict:
"""Delete the target of a file:// URL
The target can be a file or a directory. `delete` will attempt to
delete write protected targets (by setting write permissions). If
the target is a directory, the complete directory and all its
content will be deleted. `delete` will not modify the permissions
of the parent of the target. That means, it will not delete a target
in a write protected directory, but it will empty target, if target is
a directory.
See :meth:`datalad_next.url_operations.UrlOperations.delete`
for parameter documentation and exception behavior.
Raises
------
UrlOperationsResourceUnknown
For deletion targets found absent.
"""
path = self._file_url_to_path(url)
try:
if path.is_dir():
self._delete_dir(path)
else:
path.chmod(stat.S_IWUSR)
path.unlink()
except FileNotFoundError as e:
raise UrlOperationsResourceUnknown(url) from e
except Exception as e:
# wrap this into the datalad-standard, but keep the
# original exception linked
raise UrlOperationsRemoteError(url, message=str(e)) from e
return {}
def _delete_dir(self, path: Path):
path.chmod(stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
for sub_path in path.iterdir():
if sub_path.is_dir():
self._delete_dir(sub_path)
else:
sub_path.chmod(stat.S_IWUSR)
sub_path.unlink()
path.rmdir()
def _copyfp(self,
src_fp: IOBase | BinaryIO,
dst_fp: IOBase | BinaryIO,
expected_size: int | None,
hash: list[str] | None,
start_log: tuple,
update_log: tuple,
finish_log: tuple,
progress_label: str,
) -> dict:
# this is pretty much shutil.copyfileobj() with the necessary
# wrapping to perform hashing and progress reporting
hasher = self._get_hasher(hash)
progress_id = self._get_progress_id(str(id(src_fp)), str(id(dst_fp)))
# Localize variable access to minimize overhead
src_fp_read = src_fp.read
dst_fp_write = dst_fp.write
props = {}
self._progress_report_start(
progress_id, start_log, progress_label, expected_size)
copy_size = 0
try:
while True:
chunk = src_fp_read(COPY_BUFSIZE)
if not chunk:
break
dst_fp_write(chunk)
chunk_size = len(chunk)
self._progress_report_update(
progress_id, update_log, chunk_size)
# compute hash simultaneously
hasher.update(chunk)
copy_size += chunk_size
props.update(hasher.get_hexdigest())
# return how much was copied. we could compare with
# `expected_size` and error on mismatch, but not all
# sources can provide that (e.g. stdin)
props['content-length'] = copy_size
return props
finally:
self._progress_report_stop(progress_id, finish_log)