Source code for datalad_next.utils.requests_auth

"""python-requests-compatible authentication handler using DataLad credentials
"""
# allow for |-type UnionType declarations
from __future__ import annotations

import logging
from typing import Dict
from urllib.parse import urlparse
import requests

from datalad_next.config import ConfigManager
from datalad_next.credman import CredentialManager
from .http_helpers import get_auth_realm

lgr = logging.getLogger('datalad.ext.next.utils.requests_auth')


__all__ = ['DataladAuth', 'HTTPBearerTokenAuth', 'parse_www_authenticate']


[docs] def parse_www_authenticate(hdr: str) -> dict: """Parse HTTP www-authenticate header This helper uses ``requests`` utilities to parse the ``www-authenticate`` header as represented in a ``requests.Response`` instance. The header may contain any number of challenge specifications. The implementation follows RFC7235, where a challenge parameters set is specified as: either a comma-separated list of parameters, or a single sequence of characters capable of holding base64-encoded information, and parameters are name=value pairs, where the name token is matched case-insensitively, and each parameter name MUST only occur once per challenge. Returns ------- dict Keys are casefolded challenge labels (e.g., 'basic', 'digest'). Values are: ``None`` (no parameter), ``str`` (a token68), or ``dict`` (name/value mapping of challenge parameters) """ plh = requests.utils.parse_list_header pdh = requests.utils.parse_dict_header challenges = {} challenge = None # challenges as well as their properties are in a single # comma-separated list for item in plh(hdr): # parse the item into a key/value set # the value will be `None` if this item was no mapping k, v = pdh(item).popitem() # split the key to check for a challenge spec start key_split = k.split(' ', maxsplit=1) if len(key_split) > 1 or v is None: item_suffix = item[len(key_split[0]) + 1:] challenge = [item[len(key_split[0]) + 1:]] if item_suffix else None challenges[key_split[0].casefold()] = challenge else: # implementation logic assumes that the above conditional # was triggered before we ever get here assert challenge challenge.append(item) return { challenge: _convert_www_authenticate_items(items) for challenge, items in challenges.items() }
def _convert_www_authenticate_items(items: list) -> None | str | dict: pdh = requests.utils.parse_dict_header # according to RFC7235, items can be: # either a comma-separated list of parameters # or a single sequence of characters capable of holding base64-encoded # information. # parameters are name=value pairs, where the name token is matched # case-insensitively, and each parameter name MUST only occur once # per challenge. if items is None: return None elif len(items) == 1 and pdh(items[0].rstrip('=')).popitem()[1] is None: # this items matches the token68 appearance (no name value # pair after potential base64 padding its removed return items[0] else: return { k.casefold(): v for i in items for k, v in pdh(i).items() }
[docs] class DataladAuth(requests.auth.AuthBase): """Requests-style authentication handler using DataLad credentials Similar to request_toolbelt's `AuthHandler`, this is a meta implementation that can be used with different actual authentication schemes. In contrast to `AuthHandler`, a credential can not only be specified directly, but credentials can be looked up based on the target URL and the server-supported authentication schemes. In addition to programmatic specification and automated lookup, manual credential entry using interactive prompts is also supported. At present, this implementation is not thread-safe. """ _supported_auth_schemes = { 'basic': 'user_password', 'digest': 'user_password', 'bearer': 'token', } def __init__(self, cfg: ConfigManager, credential: str | None = None): """ Parameters ---------- cfg: ConfigManager Is passed to CredentialManager() as `cfg`-parameter. credential: str, optional Name of a particular credential to be used for any operations. """ self._credman = CredentialManager(cfg) self._credential = credential self._entered_credential = None
[docs] def save_entered_credential(self, suggested_name: str | None = None, context: str | None = None) -> Dict | None: """Utility method to save a pending credential in the store Pending credentials have been entered manually, and were subsequently used successfully for authentication. Saving a credential will prompt for entering a name to identify the credentials. """ if self._entered_credential is None: # nothing to do return None return self._credman.set( name=None, _lastused=True, _suggested_name=suggested_name, _context=context, **self._entered_credential )
def __call__(self, r): # TODO support being called from multiple threads #self.init_per_thread_state() # register hooks to be executed from a response to this # request is available # Redirect: reset credentials to avoid leakage to other server r.register_hook("response", self.handle_redirect) # 401 Unauthorized: look for a credential and try again r.register_hook("response", self.handle_401) return r def _get_credential(self, url, auth_schemes ) -> tuple[str | None, str | None, Dict | None]: """Get a credential for access to `url` given server-supported schemes If a particular credential to use was given to the `DataladAuth` constructor it reported here. In all other situations a credential will be looked up, based on the access URL and the authentication schemes supported by the host. The authentication schemes will be tested in the order in which they are reported by the remote host. If no matching credential can be identified, a prompt to enter a credential is presented. The credential type will match, and be used with the first authentication scheme that is both reported by the host, and by this implementation. The methods returns a 3-tuple. The first element is an identifier for the authentication scheme ('basic', digest', etc.) to use with the credential. The second item is the name for the reported credential, and the third element is a dictionary with the credential properties and its secret. Any of the three items can be `None` if the respective information is not available. """ if self._credential: cred = self._credman.get(name=self._credential) # this credential is scheme independent return None, self._credential, cred # no credential identified, find one for ascheme in auth_schemes: if ascheme not in DataladAuth._supported_auth_schemes: # nothing we can handle continue ctype = DataladAuth._supported_auth_schemes[ascheme] # get a realm ID for this authentication scheme realm = get_auth_realm(url, auth_schemes, scheme=ascheme) # ask for matching credentials creds = [ (name, cred) for name, cred in self._credman.query( _sortby='last-used', type=ctype, realm=realm, ) # we can only work with complete credentials, although # query() might return others. We exclude them here # to be able to fall back on manual entry further down if cred.get('secret') ] if creds: # we have matches, go with the last used one name, cred = creds[0] return ascheme, name, cred # no success finding an existing credential, now ask, if possible # pick a scheme that is supported by the server and by us ascheme = [s for s in auth_schemes if s in DataladAuth._supported_auth_schemes] if not ascheme: # f-string OK, only executed on failure lgr.debug( 'Only unsupported HTTP auth schemes offered ' f'{list(auth_schemes.keys())!r}') # go with the first supported scheme ascheme = ascheme[0] ctype = DataladAuth._supported_auth_schemes[ascheme] try: realm = get_auth_realm(url, auth_schemes) cred = self._credman.get( name=None, _prompt=f'Credential needed for accessing {url} (authentication realm {realm!r})', _type_hint=ctype, type=ctype, # include the realm in the credential to avoid asking for it # interactively (it is a server-specified property # users would generally not know, if they do, they can use the # `credentials` command upfront. realm=realm ) self._entered_credential = cred return ascheme, None, cred except Exception as e: lgr.debug('Credential retrieval failed: %s', e) return ascheme, None, None
[docs] def handle_401(self, r, **kwargs): """Callback that received any response to a request Any non-4xx response or a response lacking a 'www-authenticate' header is ignored. Server-provided 'www-authenticated' challenges are inspected, and corresponding credentials are looked-up (if needed) and subsequently tried in a re-request to the original URL after performing any necessary actions to meet a given challenge. Such a re-request is then using the same connection as the original request. Particular challenges are implemented in dedicated classes, e.g. :class:`requests.auth.HTTPBasicAuth`. Credential look-up or entry is performed by :meth:`datalad_next.requests_auth.DataladAuth._get_credential`. """ if not 400 <= r.status_code < 500: # fast return if this is no error, see # https://github.com/psf/requests/issues/3772 for background return r if 'www-authenticate' not in r.headers: # no info on how to authenticate to react to, leave it as-is. # this also catches any non-401-like error code (e.g. 429). # doing this more loose check (rather then going for 401 # specifically) enables to support services that send # www-authenticate with e.g. 403s return r # which auth schemes does the server support? auth_schemes = parse_www_authenticate(r.headers['www-authenticate']) ascheme, credname, cred = self._get_credential(r.url, auth_schemes) if cred is None or 'secret' not in cred: # we got nothing, leave things as they are return r # TODO add safety check. if a credential somehow contains # information on its scope (i.e. only for github.com) # prevent its use for other hosts -- maybe unless given explicitly. if ascheme is None: # if there is no authentication scheme identified, look at the # credential, if it knows ascheme = cred.get('http_auth_scheme') # if it does not, go with the first supported scheme that matches # the credential type, one is guaranteed to match ascheme = [ c for c in auth_schemes if c in DataladAuth._supported_auth_schemes and cred.get('type') == DataladAuth._supported_auth_schemes[c] ][0] if ascheme == 'basic': return self._authenticated_rerequest( r, requests.auth.HTTPBasicAuth(cred['user'], cred['secret']), **kwargs) elif ascheme == 'digest': return self._authenticated_rerequest( r, requests.auth.HTTPDigestAuth(cred['user'], cred['secret']), **kwargs) elif ascheme == 'bearer': return self._authenticated_rerequest( r, HTTPBearerTokenAuth(cred['secret']), **kwargs) else: raise NotImplementedError( 'Only unsupported HTTP auth schemes offered ' f'{list(auth_schemes.keys())!r} need {ascheme!r}')
[docs] def handle_redirect(self, r, **kwargs): """Callback that received any response to a request Any non-redirect response is ignore. This callback drops an explicitly set credential whenever the redirect causes a non-encrypted connection to be used after the original request was encrypted, or when the `netloc` of the redirect differs from the original target. """ if r.is_redirect and self._credential: og_p = urlparse(r.url) rd_p = urlparse(r.headers.get('location'), '') if og_p.netloc != rd_p.netloc or ( rd_p.scheme == 'http' and og_p.scheme == 'https'): lgr.debug( 'URL redirect, discarded given credential %r ' 'to avoid leakage', self._credential) self._credential = None
def _authenticated_rerequest( self, response: requests.models.Response, auth: requests.auth.AuthBase, **kwargs ) -> requests.models.Response: """Helper to rerun a request, but with authentication added""" prep = _get_renewed_request(response) auth(prep) _r = response.connection.send(prep, **kwargs) _r.history.append(response) _r.request = prep return _r
def _get_renewed_request(r: requests.models.Response ) -> requests.models.PreparedRequest: """Helper. Logic taken from requests.auth.HTTPDigestAuth""" # Consume content and release the original connection # to allow our new request to reuse the same one. r.content r.close() prep = r.request.copy() requests.cookies.extract_cookies_to_jar( prep._cookies, r.request, r.raw) prep.prepare_cookies(prep._cookies) return prep class HTTPBearerTokenAuth(requests.auth.AuthBase): """Attaches HTTP Bearer Token Authentication to the given Request object. """ def __init__(self, token): super().__init__() self.token = token def __call__(self, r): r.headers["Authorization"] = f'Bearer {self.token}' return r