]> jfr.im git - yt-dlp.git/blobdiff - yt_dlp/utils/_utils.py
[networking] Add module (#2861)
[yt-dlp.git] / yt_dlp / utils / _utils.py
index 4af955743dd870eb68048fc38de09204eb5ae0c5..d5704cadca53bf5e33cf394aaf0641a647192b34 100644 (file)
@@ -11,7 +11,6 @@
 import email.header
 import email.utils
 import errno
-import gzip
 import hashlib
 import hmac
 import html.entities
@@ -46,7 +45,6 @@
 import urllib.parse
 import urllib.request
 import xml.etree.ElementTree
-import zlib
 
 from . import traversal
 
@@ -58,8 +56,7 @@
     compat_os_name,
     compat_shlex_quote,
 )
-from ..dependencies import brotli, certifi, websockets, xattr
-from ..socks import ProxyType, sockssocket
+from ..dependencies import websockets, xattr
 
 __name__ = __name__.rsplit('.', 1)[0]  # Pretend to be the parent module
 
 compiled_regex_type = type(re.compile(''))
 
 
-def random_user_agent():
-    _USER_AGENT_TPL = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/%s Safari/537.36'
-    _CHROME_VERSIONS = (
-        '90.0.4430.212',
-        '90.0.4430.24',
-        '90.0.4430.70',
-        '90.0.4430.72',
-        '90.0.4430.85',
-        '90.0.4430.93',
-        '91.0.4472.101',
-        '91.0.4472.106',
-        '91.0.4472.114',
-        '91.0.4472.124',
-        '91.0.4472.164',
-        '91.0.4472.19',
-        '91.0.4472.77',
-        '92.0.4515.107',
-        '92.0.4515.115',
-        '92.0.4515.131',
-        '92.0.4515.159',
-        '92.0.4515.43',
-        '93.0.4556.0',
-        '93.0.4577.15',
-        '93.0.4577.63',
-        '93.0.4577.82',
-        '94.0.4606.41',
-        '94.0.4606.54',
-        '94.0.4606.61',
-        '94.0.4606.71',
-        '94.0.4606.81',
-        '94.0.4606.85',
-        '95.0.4638.17',
-        '95.0.4638.50',
-        '95.0.4638.54',
-        '95.0.4638.69',
-        '95.0.4638.74',
-        '96.0.4664.18',
-        '96.0.4664.45',
-        '96.0.4664.55',
-        '96.0.4664.93',
-        '97.0.4692.20',
-    )
-    return _USER_AGENT_TPL % random.choice(_CHROME_VERSIONS)
-
-
-SUPPORTED_ENCODINGS = [
-    'gzip', 'deflate'
-]
-if brotli:
-    SUPPORTED_ENCODINGS.append('br')
-
-std_headers = {
-    'User-Agent': random_user_agent(),
-    'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
-    'Accept-Language': 'en-us,en;q=0.5',
-    'Sec-Fetch-Mode': 'navigate',
-}
-
-
 USER_AGENTS = {
     'Safari': 'Mozilla/5.0 (X11; Linux x86_64; rv:10.0) AppleWebKit/533.20.25 (KHTML, like Gecko) Version/5.0.4 Safari/533.20.27',
 }
@@ -958,80 +896,16 @@ def formatSeconds(secs, delim=':', msec=False):
     return '%s.%03d' % (ret, time.milliseconds) if msec else ret
 
 
-def _ssl_load_windows_store_certs(ssl_context, storename):
-    # Code adapted from _load_windows_store_certs in https://github.com/python/cpython/blob/main/Lib/ssl.py
-    try:
-        certs = [cert for cert, encoding, trust in ssl.enum_certificates(storename)
-                 if encoding == 'x509_asn' and (
-                     trust is True or ssl.Purpose.SERVER_AUTH.oid in trust)]
-    except PermissionError:
-        return
-    for cert in certs:
-        with contextlib.suppress(ssl.SSLError):
-            ssl_context.load_verify_locations(cadata=cert)
-
-
 def make_HTTPS_handler(params, **kwargs):
-    opts_check_certificate = not params.get('nocheckcertificate')
-    context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
-    context.check_hostname = opts_check_certificate
-    if params.get('legacyserverconnect'):
-        context.options |= 4  # SSL_OP_LEGACY_SERVER_CONNECT
-        # Allow use of weaker ciphers in Python 3.10+. See https://bugs.python.org/issue43998
-        context.set_ciphers('DEFAULT')
-    elif (
-        sys.version_info < (3, 10)
-        and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1)
-        and not ssl.OPENSSL_VERSION.startswith('LibreSSL')
-    ):
-        # Backport the default SSL ciphers and minimum TLS version settings from Python 3.10 [1].
-        # This is to ensure consistent behavior across Python versions, and help avoid fingerprinting
-        # in some situations [2][3].
-        # Python 3.10 only supports OpenSSL 1.1.1+ [4]. Because this change is likely
-        # untested on older versions, we only apply this to OpenSSL 1.1.1+ to be safe.
-        # LibreSSL is excluded until further investigation due to cipher support issues [5][6].
-        # 1. https://github.com/python/cpython/commit/e983252b516edb15d4338b0a47631b59ef1e2536
-        # 2. https://github.com/yt-dlp/yt-dlp/issues/4627
-        # 3. https://github.com/yt-dlp/yt-dlp/pull/5294
-        # 4. https://peps.python.org/pep-0644/
-        # 5. https://peps.python.org/pep-0644/#libressl-support
-        # 6. https://github.com/yt-dlp/yt-dlp/commit/5b9f253fa0aee996cf1ed30185d4b502e00609c4#commitcomment-89054368
-        context.set_ciphers('@SECLEVEL=2:ECDH+AESGCM:ECDH+CHACHA20:ECDH+AES:DHE+AES:!aNULL:!eNULL:!aDSS:!SHA1:!AESCCM')
-        context.minimum_version = ssl.TLSVersion.TLSv1_2
-
-    context.verify_mode = ssl.CERT_REQUIRED if opts_check_certificate else ssl.CERT_NONE
-    if opts_check_certificate:
-        if certifi and 'no-certifi' not in params.get('compat_opts', []):
-            context.load_verify_locations(cafile=certifi.where())
-        else:
-            try:
-                context.load_default_certs()
-                # Work around the issue in load_default_certs when there are bad certificates. See:
-                # https://github.com/yt-dlp/yt-dlp/issues/1060,
-                # https://bugs.python.org/issue35665, https://bugs.python.org/issue45312
-            except ssl.SSLError:
-                # enum_certificates is not present in mingw python. See https://github.com/yt-dlp/yt-dlp/issues/1151
-                if sys.platform == 'win32' and hasattr(ssl, 'enum_certificates'):
-                    for storename in ('CA', 'ROOT'):
-                        _ssl_load_windows_store_certs(context, storename)
-                context.set_default_verify_paths()
-
-    client_certfile = params.get('client_certificate')
-    if client_certfile:
-        try:
-            context.load_cert_chain(
-                client_certfile, keyfile=params.get('client_certificate_key'),
-                password=params.get('client_certificate_password'))
-        except ssl.SSLError:
-            raise YoutubeDLError('Unable to load client certificate')
-
-    # Some servers may reject requests if ALPN extension is not sent. See:
-    # https://github.com/python/cpython/issues/85140
-    # https://github.com/yt-dlp/yt-dlp/issues/3878
-    with contextlib.suppress(NotImplementedError):
-        context.set_alpn_protocols(['http/1.1'])
-
-    return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
+    from ..networking._helper import make_ssl_context
+    return YoutubeDLHTTPSHandler(params, context=make_ssl_context(
+        verify=not params.get('nocheckcertificate'),
+        client_certificate=params.get('client_certificate'),
+        client_certificate_key=params.get('client_certificate_key'),
+        client_certificate_password=params.get('client_certificate_password'),
+        legacy_support=params.get('legacyserverconnect'),
+        use_certifi='no-certifi' not in params.get('compat_opts', []),
+    ), **kwargs)
 
 
 def bug_reports_message(before=';'):
@@ -1059,12 +933,6 @@ def __init__(self, msg=None):
         super().__init__(self.msg)
 
 
-network_exceptions = [urllib.error.URLError, http.client.HTTPException, socket.error]
-if hasattr(ssl, 'CertificateError'):
-    network_exceptions.append(ssl.CertificateError)
-network_exceptions = tuple(network_exceptions)
-
-
 class ExtractorError(YoutubeDLError):
     """Error during info extraction."""
 
@@ -1072,6 +940,7 @@ def __init__(self, msg, tb=None, expected=False, cause=None, video_id=None, ie=N
         """ tb, if given, is the original traceback (so that it can be printed out).
         If expected is set, this is a normal error message and most likely not a bug in yt-dlp.
         """
+        from ..networking.exceptions import network_exceptions
         if sys.exc_info()[0] in network_exceptions:
             expected = True
 
@@ -1271,225 +1140,6 @@ class XAttrUnavailableError(YoutubeDLError):
     pass
 
 
-def _create_http_connection(ydl_handler, http_class, is_https, *args, **kwargs):
-    hc = http_class(*args, **kwargs)
-    source_address = ydl_handler._params.get('source_address')
-
-    if source_address is not None:
-        # This is to workaround _create_connection() from socket where it will try all
-        # address data from getaddrinfo() including IPv6. This filters the result from
-        # getaddrinfo() based on the source_address value.
-        # This is based on the cpython socket.create_connection() function.
-        # https://github.com/python/cpython/blob/master/Lib/socket.py#L691
-        def _create_connection(address, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, source_address=None):
-            host, port = address
-            err = None
-            addrs = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)
-            af = socket.AF_INET if '.' in source_address[0] else socket.AF_INET6
-            ip_addrs = [addr for addr in addrs if addr[0] == af]
-            if addrs and not ip_addrs:
-                ip_version = 'v4' if af == socket.AF_INET else 'v6'
-                raise OSError(
-                    "No remote IP%s addresses available for connect, can't use '%s' as source address"
-                    % (ip_version, source_address[0]))
-            for res in ip_addrs:
-                af, socktype, proto, canonname, sa = res
-                sock = None
-                try:
-                    sock = socket.socket(af, socktype, proto)
-                    if timeout is not socket._GLOBAL_DEFAULT_TIMEOUT:
-                        sock.settimeout(timeout)
-                    sock.bind(source_address)
-                    sock.connect(sa)
-                    err = None  # Explicitly break reference cycle
-                    return sock
-                except OSError as _:
-                    err = _
-                    if sock is not None:
-                        sock.close()
-            if err is not None:
-                raise err
-            else:
-                raise OSError('getaddrinfo returns an empty list')
-        if hasattr(hc, '_create_connection'):
-            hc._create_connection = _create_connection
-        hc.source_address = (source_address, 0)
-
-    return hc
-
-
-class YoutubeDLHandler(urllib.request.HTTPHandler):
-    """Handler for HTTP requests and responses.
-
-    This class, when installed with an OpenerDirector, automatically adds
-    the standard headers to every HTTP request and handles gzipped, deflated and
-    brotli responses from web servers.
-
-    Part of this code was copied from:
-
-    http://techknack.net/python-urllib2-handlers/
-
-    Andrew Rowls, the author of that code, agreed to release it to the
-    public domain.
-    """
-
-    def __init__(self, params, *args, **kwargs):
-        urllib.request.HTTPHandler.__init__(self, *args, **kwargs)
-        self._params = params
-
-    def http_open(self, req):
-        conn_class = http.client.HTTPConnection
-
-        socks_proxy = req.headers.get('Ytdl-socks-proxy')
-        if socks_proxy:
-            conn_class = make_socks_conn_class(conn_class, socks_proxy)
-            del req.headers['Ytdl-socks-proxy']
-
-        return self.do_open(functools.partial(
-            _create_http_connection, self, conn_class, False),
-            req)
-
-    @staticmethod
-    def deflate(data):
-        if not data:
-            return data
-        try:
-            return zlib.decompress(data, -zlib.MAX_WBITS)
-        except zlib.error:
-            return zlib.decompress(data)
-
-    @staticmethod
-    def brotli(data):
-        if not data:
-            return data
-        return brotli.decompress(data)
-
-    @staticmethod
-    def gz(data):
-        gz = gzip.GzipFile(fileobj=io.BytesIO(data), mode='rb')
-        try:
-            return gz.read()
-        except OSError as original_oserror:
-            # There may be junk add the end of the file
-            # See http://stackoverflow.com/q/4928560/35070 for details
-            for i in range(1, 1024):
-                try:
-                    gz = gzip.GzipFile(fileobj=io.BytesIO(data[:-i]), mode='rb')
-                    return gz.read()
-                except OSError:
-                    continue
-            else:
-                raise original_oserror
-
-    def http_request(self, req):
-        # According to RFC 3986, URLs can not contain non-ASCII characters, however this is not
-        # always respected by websites, some tend to give out URLs with non percent-encoded
-        # non-ASCII characters (see telemb.py, ard.py [#3412])
-        # urllib chokes on URLs with non-ASCII characters (see http://bugs.python.org/issue3991)
-        # To work around aforementioned issue we will replace request's original URL with
-        # percent-encoded one
-        # Since redirects are also affected (e.g. http://www.southpark.de/alle-episoden/s18e09)
-        # the code of this workaround has been moved here from YoutubeDL.urlopen()
-        url = req.get_full_url()
-        url_escaped = escape_url(url)
-
-        # Substitute URL if any change after escaping
-        if url != url_escaped:
-            req = update_Request(req, url=url_escaped)
-
-        for h, v in self._params.get('http_headers', std_headers).items():
-            # Capitalize is needed because of Python bug 2275: http://bugs.python.org/issue2275
-            # The dict keys are capitalized because of this bug by urllib
-            if h.capitalize() not in req.headers:
-                req.add_header(h, v)
-
-        if 'Youtubedl-no-compression' in req.headers:  # deprecated
-            req.headers.pop('Youtubedl-no-compression', None)
-            req.add_header('Accept-encoding', 'identity')
-
-        if 'Accept-encoding' not in req.headers:
-            req.add_header('Accept-encoding', ', '.join(SUPPORTED_ENCODINGS))
-
-        return super().do_request_(req)
-
-    def http_response(self, req, resp):
-        old_resp = resp
-
-        # Content-Encoding header lists the encodings in order that they were applied [1].
-        # To decompress, we simply do the reverse.
-        # [1]: https://datatracker.ietf.org/doc/html/rfc9110#name-content-encoding
-        decoded_response = None
-        for encoding in (e.strip() for e in reversed(resp.headers.get('Content-encoding', '').split(','))):
-            if encoding == 'gzip':
-                decoded_response = self.gz(decoded_response or resp.read())
-            elif encoding == 'deflate':
-                decoded_response = self.deflate(decoded_response or resp.read())
-            elif encoding == 'br' and brotli:
-                decoded_response = self.brotli(decoded_response or resp.read())
-
-        if decoded_response is not None:
-            resp = urllib.request.addinfourl(io.BytesIO(decoded_response), old_resp.headers, old_resp.url, old_resp.code)
-            resp.msg = old_resp.msg
-        # Percent-encode redirect URL of Location HTTP header to satisfy RFC 3986 (see
-        # https://github.com/ytdl-org/youtube-dl/issues/6457).
-        if 300 <= resp.code < 400:
-            location = resp.headers.get('Location')
-            if location:
-                # As of RFC 2616 default charset is iso-8859-1 that is respected by python 3
-                location = location.encode('iso-8859-1').decode()
-                location_escaped = escape_url(location)
-                if location != location_escaped:
-                    del resp.headers['Location']
-                    resp.headers['Location'] = location_escaped
-        return resp
-
-    https_request = http_request
-    https_response = http_response
-
-
-def make_socks_conn_class(base_class, socks_proxy):
-    assert issubclass(base_class, (
-        http.client.HTTPConnection, http.client.HTTPSConnection))
-
-    url_components = urllib.parse.urlparse(socks_proxy)
-    if url_components.scheme.lower() == 'socks5':
-        socks_type = ProxyType.SOCKS5
-    elif url_components.scheme.lower() in ('socks', 'socks4'):
-        socks_type = ProxyType.SOCKS4
-    elif url_components.scheme.lower() == 'socks4a':
-        socks_type = ProxyType.SOCKS4A
-
-    def unquote_if_non_empty(s):
-        if not s:
-            return s
-        return urllib.parse.unquote_plus(s)
-
-    proxy_args = (
-        socks_type,
-        url_components.hostname, url_components.port or 1080,
-        True,  # Remote DNS
-        unquote_if_non_empty(url_components.username),
-        unquote_if_non_empty(url_components.password),
-    )
-
-    class SocksConnection(base_class):
-        def connect(self):
-            self.sock = sockssocket()
-            self.sock.setproxy(*proxy_args)
-            if isinstance(self.timeout, (int, float)):
-                self.sock.settimeout(self.timeout)
-            self.sock.connect((self.host, self.port))
-
-            if isinstance(self, http.client.HTTPSConnection):
-                if hasattr(self, '_context'):  # Python > 2.6
-                    self.sock = self._context.wrap_socket(
-                        self.sock, server_hostname=self.host)
-                else:
-                    self.sock = ssl.wrap_socket(self.sock)
-
-    return SocksConnection
-
-
 class YoutubeDLHTTPSHandler(urllib.request.HTTPSHandler):
     def __init__(self, params, https_conn_class=None, *args, **kwargs):
         urllib.request.HTTPSHandler.__init__(self, *args, **kwargs)
@@ -1507,9 +1157,11 @@ def https_open(self, req):
 
         socks_proxy = req.headers.get('Ytdl-socks-proxy')
         if socks_proxy:
+            from ..networking._urllib import make_socks_conn_class
             conn_class = make_socks_conn_class(conn_class, socks_proxy)
             del req.headers['Ytdl-socks-proxy']
 
+        from ..networking._urllib import _create_http_connection
         try:
             return self.do_open(
                 functools.partial(_create_http_connection, self, conn_class, True), req, **kwargs)
@@ -1535,56 +1187,6 @@ def http_response(self, request, response):
     https_response = http_response
 
 
-class YoutubeDLRedirectHandler(urllib.request.HTTPRedirectHandler):
-    """YoutubeDL redirect handler
-
-    The code is based on HTTPRedirectHandler implementation from CPython [1].
-
-    This redirect handler fixes and improves the logic to better align with RFC7261
-     and what browsers tend to do [2][3]
-
-    1. https://github.com/python/cpython/blob/master/Lib/urllib/request.py
-    2. https://datatracker.ietf.org/doc/html/rfc7231
-    3. https://github.com/python/cpython/issues/91306
-    """
-
-    http_error_301 = http_error_303 = http_error_307 = http_error_308 = urllib.request.HTTPRedirectHandler.http_error_302
-
-    def redirect_request(self, req, fp, code, msg, headers, newurl):
-        if code not in (301, 302, 303, 307, 308):
-            raise urllib.error.HTTPError(req.full_url, code, msg, headers, fp)
-
-        new_method = req.get_method()
-        new_data = req.data
-
-        # Technically the Cookie header should be in unredirected_hdrs,
-        # however in practice some may set it in normal headers anyway.
-        # We will remove it here to prevent any leaks.
-        remove_headers = ['Cookie']
-
-        # A 303 must either use GET or HEAD for subsequent request
-        # https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.4
-        if code == 303 and req.get_method() != 'HEAD':
-            new_method = 'GET'
-        # 301 and 302 redirects are commonly turned into a GET from a POST
-        # for subsequent requests by browsers, so we'll do the same.
-        # https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.2
-        # https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.3
-        elif code in (301, 302) and req.get_method() == 'POST':
-            new_method = 'GET'
-
-        # only remove payload if method changed (e.g. POST to GET)
-        if new_method != req.get_method():
-            new_data = None
-            remove_headers.extend(['Content-Length', 'Content-Type'])
-
-        new_headers = {k: v for k, v in req.headers.items() if k.title() not in remove_headers}
-
-        return urllib.request.Request(
-            newurl, headers=new_headers, origin_req_host=req.origin_req_host,
-            unverifiable=True, method=new_method, data=new_data)
-
-
 def extract_timezone(date_str):
     m = re.search(
         r'''(?x)
@@ -2390,16 +1992,6 @@ def urljoin(base, path):
     return urllib.parse.urljoin(base, path)
 
 
-class HEADRequest(urllib.request.Request):
-    def get_method(self):
-        return 'HEAD'
-
-
-class PUTRequest(urllib.request.Request):
-    def get_method(self):
-        return 'PUT'
-
-
 def int_or_none(v, scale=1, default=None, get_attr=None, invscale=1):
     if get_attr and v is not None:
         v = getattr(v, get_attr, None)
@@ -3016,26 +2608,6 @@ def update_url_query(url, query):
     return update_url(url, query_update=query)
 
 
-def update_Request(req, url=None, data=None, headers=None, query=None):
-    req_headers = req.headers.copy()
-    req_headers.update(headers or {})
-    req_data = data or req.data
-    req_url = update_url_query(url or req.get_full_url(), query)
-    req_get_method = req.get_method()
-    if req_get_method == 'HEAD':
-        req_type = HEADRequest
-    elif req_get_method == 'PUT':
-        req_type = PUTRequest
-    else:
-        req_type = urllib.request.Request
-    new_req = req_type(
-        req_url, data=req_data, headers=req_headers,
-        origin_req_host=req.origin_req_host, unverifiable=req.unverifiable)
-    if hasattr(req, 'timeout'):
-        new_req.timeout = req.timeout
-    return new_req
-
-
 def _multipart_encode_impl(data, boundary):
     content_type = 'multipart/form-data; boundary=%s' % boundary
 
@@ -4769,31 +4341,6 @@ def random_ipv4(cls, code_or_block):
             struct.pack('!L', random.randint(addr_min, addr_max))))
 
 
-class PerRequestProxyHandler(urllib.request.ProxyHandler):
-    def __init__(self, proxies=None):
-        # Set default handlers
-        for type in ('http', 'https'):
-            setattr(self, '%s_open' % type,
-                    lambda r, proxy='__noproxy__', type=type, meth=self.proxy_open:
-                        meth(r, proxy, type))
-        urllib.request.ProxyHandler.__init__(self, proxies)
-
-    def proxy_open(self, req, proxy, type):
-        req_proxy = req.headers.get('Ytdl-request-proxy')
-        if req_proxy is not None:
-            proxy = req_proxy
-            del req.headers['Ytdl-request-proxy']
-
-        if proxy == '__noproxy__':
-            return None  # No Proxy
-        if urllib.parse.urlparse(proxy).scheme.lower() in ('socks', 'socks4', 'socks4a', 'socks5'):
-            req.add_header('Ytdl-socks-proxy', proxy)
-            # yt-dlp's http/https handlers do wrapping the socket with socks
-            return None
-        return urllib.request.ProxyHandler.proxy_open(
-            self, req, proxy, type)
-
-
 # Both long_to_bytes and bytes_to_long are adapted from PyCrypto, which is
 # released into Public Domain
 # https://github.com/dlitz/pycrypto/blob/master/lib/Crypto/Util/number.py#L387