]> jfr.im git - yt-dlp.git/blobdiff - yt_dlp/utils.py
[compat] Remove deprecated functions from core code
[yt-dlp.git] / yt_dlp / utils.py
index 7327f315002df1a9a204c25b5a255696be4170bf..fd6c206823130a92f6eb2b60123917ffd64bb1ed 100644 (file)
@@ -39,6 +39,7 @@
 import time
 import traceback
 import types
+import urllib.error
 import urllib.parse
 import urllib.request
 import xml.etree.ElementTree
     compat_etree_fromstring,
     compat_expanduser,
     compat_HTMLParseError,
-    compat_HTTPError,
     compat_os_name,
-    compat_parse_qs,
     compat_shlex_quote,
-    compat_str,
-    compat_urllib_parse_urlencode,
-    compat_urllib_parse_urlparse,
-    compat_urlparse,
 )
 from .dependencies import brotli, certifi, websockets, xattr
 from .socks import ProxyType, sockssocket
@@ -67,8 +62,8 @@ def register_socks_protocols():
     # In Python < 2.6.5, urlsplit() suffers from bug https://bugs.python.org/issue7904
     # URLs with protocols not in urlparse.uses_netloc are not handled correctly
     for scheme in ('socks', 'socks4', 'socks4a', 'socks5'):
-        if scheme not in compat_urlparse.uses_netloc:
-            compat_urlparse.uses_netloc.append(scheme)
+        if scheme not in urllib.parse.uses_netloc:
+            urllib.parse.uses_netloc.append(scheme)
 
 
 # This is not clearly defined otherwise
@@ -311,7 +306,7 @@ def xpath_element(node, xpath, name=None, fatal=False, default=NO_DEFAULT):
     def _find_xpath(xpath):
         return node.find(xpath)
 
-    if isinstance(xpath, (str, compat_str)):
+    if isinstance(xpath, str):
         n = _find_xpath(xpath)
     else:
         for xp in xpath:
@@ -741,10 +736,10 @@ def sanitize_url(url):
 
 
 def extract_basic_auth(url):
-    parts = compat_urlparse.urlsplit(url)
+    parts = urllib.parse.urlsplit(url)
     if parts.username is None:
         return url, None
-    url = compat_urlparse.urlunsplit(parts._replace(netloc=(
+    url = urllib.parse.urlunsplit(parts._replace(netloc=(
         parts.hostname if parts.port is None
         else '%s:%d' % (parts.hostname, parts.port))))
     auth_payload = base64.b64encode(
@@ -889,7 +884,7 @@ def decodeFilename(b, for_subprocess=False):
 def encodeArgument(s):
     # Legacy code that uses byte strings
     # Uncomment the following line after fixing all post processors
-    # assert isinstance(s, str), 'Internal error: %r should be of type %r, is %r' % (s, compat_str, type(s))
+    # assert isinstance(s, str), 'Internal error: %r should be of type %r, is %r' % (s, str, type(s))
     return s if isinstance(s, str) else s.decode('ascii')
 
 
@@ -903,7 +898,7 @@ def decodeOption(optval):
     if isinstance(optval, bytes):
         optval = optval.decode(preferredencoding())
 
-    assert isinstance(optval, compat_str)
+    assert isinstance(optval, str)
     return optval
 
 
@@ -1395,7 +1390,7 @@ def make_socks_conn_class(base_class, socks_proxy):
     assert issubclass(base_class, (
         http.client.HTTPConnection, http.client.HTTPSConnection))
 
-    url_components = compat_urlparse.urlparse(socks_proxy)
+    url_components = urllib.parse.urlparse(socks_proxy)
     if url_components.scheme.lower() == 'socks5':
         socks_type = ProxyType.SOCKS5
     elif url_components.scheme.lower() in ('socks', 'socks4'):
@@ -1639,7 +1634,7 @@ def redirect_request(self, req, fp, code, msg, headers, newurl):
         m = req.get_method()
         if (not (code in (301, 302, 303, 307, 308) and m in ("GET", "HEAD")
                  or code in (301, 302, 303) and m == "POST")):
-            raise compat_HTTPError(req.full_url, code, msg, headers, fp)
+            raise urllib.error.HTTPError(req.full_url, code, msg, headers, fp)
         # Strictly (according to RFC 2616), 301 or 302 in response to
         # a POST MUST NOT cause a redirection without confirmation
         # from the user (of urllib.request, in this case).  In practice,
@@ -1739,7 +1734,7 @@ def unified_strdate(date_str, day_first=True):
             with contextlib.suppress(ValueError):
                 upload_date = datetime.datetime(*timetuple[:6]).strftime('%Y%m%d')
     if upload_date is not None:
-        return compat_str(upload_date)
+        return str(upload_date)
 
 
 def unified_timestamp(date_str, day_first=True):
@@ -1913,12 +1908,12 @@ def __str__(self):
 
 
 def platform_name():
-    """ Returns the platform name as a compat_str """
+    """ Returns the platform name as a str """
     res = platform.platform()
     if isinstance(res, bytes):
         res = res.decode(preferredencoding())
 
-    assert isinstance(res, compat_str)
+    assert isinstance(res, str)
     return res
 
 
@@ -2144,7 +2139,7 @@ def smuggle_url(url, data):
 
     url, idata = unsmuggle_url(url, {})
     data.update(idata)
-    sdata = compat_urllib_parse_urlencode(
+    sdata = urllib.parse.urlencode(
         {'__youtubedl_smuggle': json.dumps(data)})
     return url + '#' + sdata
 
@@ -2153,7 +2148,7 @@ def unsmuggle_url(smug_url, default=None):
     if '#__youtubedl_smuggle' not in smug_url:
         return smug_url, default
     url, _, sdata = smug_url.rpartition('#')
-    jsond = compat_parse_qs(sdata)['__youtubedl_smuggle'][0]
+    jsond = urllib.parse.parse_qs(sdata)['__youtubedl_smuggle'][0]
     data = json.loads(jsond)
     return url, data
 
@@ -2313,7 +2308,7 @@ def parse_resolution(s, *, lenient=False):
 
 
 def parse_bitrate(s):
-    if not isinstance(s, compat_str):
+    if not isinstance(s, str):
         return
     mobj = re.search(r'\b(\d+)\s*kbps', s)
     if mobj:
@@ -2350,7 +2345,7 @@ def fix_xml_ampersands(xml_str):
 
 
 def setproctitle(title):
-    assert isinstance(title, compat_str)
+    assert isinstance(title, str)
 
     # ctypes in Jython is not complete
     # http://bugs.jython.org/issue2148
@@ -2398,7 +2393,7 @@ def get_domain(url):
 
 
 def url_basename(url):
-    path = compat_urlparse.urlparse(url).path
+    path = urllib.parse.urlparse(url).path
     return path.strip('/').split('/')[-1]
 
 
@@ -2409,16 +2404,16 @@ def base_url(url):
 def urljoin(base, path):
     if isinstance(path, bytes):
         path = path.decode()
-    if not isinstance(path, compat_str) or not path:
+    if not isinstance(path, str) or not path:
         return None
     if re.match(r'^(?:[a-zA-Z][a-zA-Z0-9+-.]*:)?//', path):
         return path
     if isinstance(base, bytes):
         base = base.decode()
-    if not isinstance(base, compat_str) or not re.match(
+    if not isinstance(base, str) or not re.match(
             r'^(?:https?:)?//', base):
         return None
-    return compat_urlparse.urljoin(base, path)
+    return urllib.parse.urljoin(base, path)
 
 
 class HEADRequest(urllib.request.Request):
@@ -2441,14 +2436,14 @@ def int_or_none(v, scale=1, default=None, get_attr=None, invscale=1):
 
 
 def str_or_none(v, default=None):
-    return default if v is None else compat_str(v)
+    return default if v is None else str(v)
 
 
 def str_to_int(int_str):
     """ A more relaxed version of int_or_none """
     if isinstance(int_str, int):
         return int_str
-    elif isinstance(int_str, compat_str):
+    elif isinstance(int_str, str):
         int_str = re.sub(r'[,\.\+]', '', int_str)
         return int_or_none(int_str)
 
@@ -2467,11 +2462,11 @@ def bool_or_none(v, default=None):
 
 
 def strip_or_none(v, default=None):
-    return v.strip() if isinstance(v, compat_str) else default
+    return v.strip() if isinstance(v, str) else default
 
 
 def url_or_none(url):
-    if not url or not isinstance(url, compat_str):
+    if not url or not isinstance(url, str):
         return None
     url = url.strip()
     return url if re.match(r'^(?:(?:https?|rt(?:m(?:pt?[es]?|fp)|sp[su]?)|mms|ftps?):)?//', url) else None
@@ -2489,7 +2484,7 @@ def strftime_or_none(timestamp, date_format, default=None):
     try:
         if isinstance(timestamp, (int, float)):  # unix timestamp
             datetime_object = datetime.datetime.utcfromtimestamp(timestamp)
-        elif isinstance(timestamp, compat_str):  # assume YYYYMMDD
+        elif isinstance(timestamp, str):  # assume YYYYMMDD
             datetime_object = datetime.datetime.strptime(timestamp, '%Y%m%d')
         return datetime_object.strftime(date_format)
     except (ValueError, TypeError, AttributeError):
@@ -2592,7 +2587,7 @@ def _get_exe_version_output(exe, args, *, to_screen=None):
 
 
 def detect_exe_version(output, version_re=None, unrecognized='present'):
-    assert isinstance(output, compat_str)
+    assert isinstance(output, str)
     if version_re is None:
         version_re = r'version\s+([-0-9._a-zA-Z]+)'
     m = re.search(version_re, output)
@@ -2973,7 +2968,7 @@ def escape_rfc3986(s):
 
 def escape_url(url):
     """Escape URL as suggested by RFC 3986"""
-    url_parsed = compat_urllib_parse_urlparse(url)
+    url_parsed = urllib.parse.urlparse(url)
     return url_parsed._replace(
         netloc=url_parsed.netloc.encode('idna').decode('ascii'),
         path=escape_rfc3986(url_parsed.path),
@@ -2984,12 +2979,12 @@ def escape_url(url):
 
 
 def parse_qs(url):
-    return compat_parse_qs(compat_urllib_parse_urlparse(url).query)
+    return urllib.parse.parse_qs(urllib.parse.urlparse(url).query)
 
 
 def read_batch_urls(batch_fd):
     def fixup(url):
-        if not isinstance(url, compat_str):
+        if not isinstance(url, str):
             url = url.decode('utf-8', 'replace')
         BOM_UTF8 = ('\xef\xbb\xbf', '\ufeff')
         for bom in BOM_UTF8:
@@ -3007,17 +3002,17 @@ def fixup(url):
 
 
 def urlencode_postdata(*args, **kargs):
-    return compat_urllib_parse_urlencode(*args, **kargs).encode('ascii')
+    return urllib.parse.urlencode(*args, **kargs).encode('ascii')
 
 
 def update_url_query(url, query):
     if not query:
         return url
-    parsed_url = compat_urlparse.urlparse(url)
-    qs = compat_parse_qs(parsed_url.query)
+    parsed_url = urllib.parse.urlparse(url)
+    qs = urllib.parse.parse_qs(parsed_url.query)
     qs.update(query)
-    return compat_urlparse.urlunparse(parsed_url._replace(
-        query=compat_urllib_parse_urlencode(qs, True)))
+    return urllib.parse.urlunparse(parsed_url._replace(
+        query=urllib.parse.urlencode(qs, True)))
 
 
 def update_Request(req, url=None, data=None, headers={}, query={}):
@@ -3046,9 +3041,9 @@ def _multipart_encode_impl(data, boundary):
     out = b''
     for k, v in data.items():
         out += b'--' + boundary.encode('ascii') + b'\r\n'
-        if isinstance(k, compat_str):
+        if isinstance(k, str):
             k = k.encode()
-        if isinstance(v, compat_str):
+        if isinstance(v, str):
             v = v.encode()
         # RFC 2047 requires non-ASCII field names to be encoded, while RFC 7578
         # suggests sending UTF-8 directly. Firefox sends UTF-8, too
@@ -3129,7 +3124,7 @@ def merge_dicts(*dicts):
 
 
 def encode_compat_str(string, encoding=preferredencoding(), errors='strict'):
-    return string if isinstance(string, compat_str) else compat_str(string, encoding, errors)
+    return string if isinstance(string, str) else str(string, encoding, errors)
 
 
 US_RATINGS = {
@@ -3509,7 +3504,7 @@ def determine_protocol(info_dict):
     elif ext == 'f4m':
         return 'f4m'
 
-    return compat_urllib_parse_urlparse(url).scheme
+    return urllib.parse.urlparse(url).scheme
 
 
 def render_table(header_row, data, delim=False, extra_gap=0, hide_empty=False):
@@ -4632,7 +4627,7 @@ def random_ipv4(cls, code_or_block):
         addr, preflen = block.split('/')
         addr_min = struct.unpack('!L', socket.inet_aton(addr))[0]
         addr_max = addr_min | (0xffffffff >> int(preflen))
-        return compat_str(socket.inet_ntoa(
+        return str(socket.inet_ntoa(
             struct.pack('!L', random.randint(addr_min, addr_max))))
 
 
@@ -4653,7 +4648,7 @@ def proxy_open(self, req, proxy, type):
 
         if proxy == '__noproxy__':
             return None  # No Proxy
-        if compat_urlparse.urlparse(proxy).scheme.lower() in ('socks', 'socks4', 'socks4a', 'socks5'):
+        if urllib.parse.urlparse(proxy).scheme.lower() in ('socks', 'socks4', 'socks4a', 'socks5'):
             req.add_header('Ytdl-socks-proxy', proxy)
             # yt-dlp's http/https handlers do wrapping the socket with socks
             return None
@@ -5036,7 +5031,7 @@ def iri_to_uri(iri):
     The function doesn't add an additional layer of escaping; e.g., it doesn't escape `%3C` as `%253C`. Instead, it percent-escapes characters with an underlying UTF-8 encoding *besides* those already escaped, leaving the URI intact.
     """
 
-    iri_parts = compat_urllib_parse_urlparse(iri)
+    iri_parts = urllib.parse.urlparse(iri)
 
     if '[' in iri_parts.netloc:
         raise ValueError('IPv6 URIs are not, yet, supported.')