X-Git-Url: https://jfr.im/git/yt-dlp.git/blobdiff_plain/ebed8b373214316fce489fe21629fcef91bb9af3..9e491463521c65ca4d1d44a757e0a115f62834f5:/yt_dlp/utils.py
diff --git a/yt_dlp/utils.py b/yt_dlp/utils.py
index 1fd85de8e..f02f71177 100644
--- a/yt_dlp/utils.py
+++ b/yt_dlp/utils.py
@@ -1,8 +1,5 @@
#!/usr/bin/env python3
-# coding: utf-8
-
-from __future__ import unicode_literals
-
+import atexit
import base64
import binascii
import calendar
@@ -11,8 +8,8 @@
import contextlib
import ctypes
import datetime
-import email.utils
import email.header
+import email.utils
import errno
import functools
import gzip
@@ -24,11 +21,13 @@
import json
import locale
import math
+import mimetypes
import operator
import os
import platform
import random
import re
+import shlex
import socket
import ssl
import subprocess
@@ -36,26 +35,22 @@
import tempfile
import time
import traceback
+import urllib.parse
import xml.etree.ElementTree
import zlib
-import mimetypes
from .compat import (
- compat_HTMLParseError,
- compat_HTMLParser,
- compat_HTTPError,
- compat_basestring,
+ asyncio,
compat_chr,
compat_cookiejar,
- compat_ctypes_WINFUNCTYPE,
compat_etree_fromstring,
compat_expanduser,
compat_html_entities,
compat_html_entities_html5,
+ compat_HTMLParseError,
+ compat_HTMLParser,
compat_http_client,
- compat_integer_types,
- compat_numeric_types,
- compat_kwargs,
+ compat_HTTPError,
compat_os_name,
compat_parse_qs,
compat_shlex_quote,
@@ -63,22 +58,14 @@
compat_struct_pack,
compat_struct_unpack,
compat_urllib_error,
- compat_urllib_parse,
+ compat_urllib_parse_unquote_plus,
compat_urllib_parse_urlencode,
compat_urllib_parse_urlparse,
- compat_urllib_parse_urlunparse,
- compat_urllib_parse_quote,
- compat_urllib_parse_quote_plus,
- compat_urllib_parse_unquote_plus,
compat_urllib_request,
compat_urlparse,
- compat_xpath,
-)
-
-from .socks import (
- ProxyType,
- sockssocket,
)
+from .dependencies import brotli, certifi, websockets
+from .socks import ProxyType, sockssocket
def register_socks_protocols():
@@ -139,11 +126,17 @@ def random_user_agent():
return _USER_AGENT_TPL % random.choice(_CHROME_VERSIONS)
+SUPPORTED_ENCODINGS = [
+ 'gzip', 'deflate'
+]
+if brotli:
+ SUPPORTED_ENCODINGS.append('br')
+
std_headers = {
'User-Agent': random_user_agent(),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
- 'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'en-us,en;q=0.5',
+ 'Sec-Fetch-Mode': 'navigate',
}
@@ -252,6 +245,8 @@ def random_user_agent():
PACKED_CODES_RE = r"}\('(.+)',(\d+),(\d+),'([^']+)'\.split\('\|'\)"
JSON_LD_RE = r'(?is)'
+NUMBER_RE = r'\d+(?:\.\d+)?'
+
def preferredencoding():
"""Get preferred encoding.
@@ -271,37 +266,9 @@ def preferredencoding():
def write_json_file(obj, fn):
""" Encode obj as JSON and write it to fn, atomically if possible """
- fn = encodeFilename(fn)
- if sys.version_info < (3, 0) and sys.platform != 'win32':
- encoding = get_filesystem_encoding()
- # os.path.basename returns a bytes object, but NamedTemporaryFile
- # will fail if the filename contains non ascii characters unless we
- # use a unicode object
- path_basename = lambda f: os.path.basename(fn).decode(encoding)
- # the same for os.path.dirname
- path_dirname = lambda f: os.path.dirname(fn).decode(encoding)
- else:
- path_basename = os.path.basename
- path_dirname = os.path.dirname
-
- args = {
- 'suffix': '.tmp',
- 'prefix': path_basename(fn) + '.',
- 'dir': path_dirname(fn),
- 'delete': False,
- }
-
- # In Python 2.x, json.dump expects a bytestream.
- # In Python 3.x, it writes to a character stream
- if sys.version_info < (3, 0):
- args['mode'] = 'wb'
- else:
- args.update({
- 'mode': 'w',
- 'encoding': 'utf-8',
- })
-
- tf = tempfile.NamedTemporaryFile(**compat_kwargs(args))
+ tf = tempfile.NamedTemporaryFile(
+ prefix=f'{os.path.basename(fn)}.', dir=os.path.dirname(fn),
+ suffix='.tmp', delete=False, mode='w', encoding='utf-8')
try:
with tf:
@@ -309,39 +276,24 @@ def write_json_file(obj, fn):
if sys.platform == 'win32':
# Need to remove existing file on Windows, else os.rename raises
# WindowsError or FileExistsError.
- try:
+ with contextlib.suppress(OSError):
os.unlink(fn)
- except OSError:
- pass
- try:
+ with contextlib.suppress(OSError):
mask = os.umask(0)
os.umask(mask)
os.chmod(tf.name, 0o666 & ~mask)
- except OSError:
- pass
os.rename(tf.name, fn)
except Exception:
- try:
+ with contextlib.suppress(OSError):
os.remove(tf.name)
- except OSError:
- pass
raise
-if sys.version_info >= (2, 7):
- def find_xpath_attr(node, xpath, key, val=None):
- """ Find the xpath xpath[@key=val] """
- assert re.match(r'^[a-zA-Z_-]+$', key)
- expr = xpath + ('[@%s]' % key if val is None else "[@%s='%s']" % (key, val))
- return node.find(expr)
-else:
- def find_xpath_attr(node, xpath, key, val=None):
- for f in node.findall(compat_xpath(xpath)):
- if key not in f.attrib:
- continue
- if val is None or f.attrib.get(key) == val:
- return f
- return None
+def find_xpath_attr(node, xpath, key, val=None):
+ """ Find the xpath xpath[@key=val] """
+ assert re.match(r'^[a-zA-Z_-]+$', key)
+ expr = xpath + ('[@%s]' % key if val is None else f"[@{key}='{val}']")
+ return node.find(expr)
# On python2.6 the xml.etree.ElementTree.Element methods don't support
# the namespace parameter
@@ -361,7 +313,7 @@ def xpath_with_ns(path, ns_map):
def xpath_element(node, xpath, name=None, fatal=False, default=NO_DEFAULT):
def _find_xpath(xpath):
- return node.find(compat_xpath(xpath))
+ return node.find(xpath)
if isinstance(xpath, (str, compat_str)):
n = _find_xpath(xpath)
@@ -403,7 +355,7 @@ def xpath_attr(node, xpath, key, name=None, fatal=False, default=NO_DEFAULT):
if default is not NO_DEFAULT:
return default
elif fatal:
- name = '%s[@%s]' % (xpath, key) if name is None else name
+ name = f'{xpath}[@{key}]' if name is None else name
raise ExtractorError('Could not find XML attribute %s' % name)
else:
return None
@@ -415,17 +367,33 @@ def get_element_by_id(id, html):
return get_element_by_attribute('id', id, html)
+def get_element_html_by_id(id, html):
+ """Return the html of the tag with the specified ID in the passed HTML document"""
+ return get_element_html_by_attribute('id', id, html)
+
+
def get_element_by_class(class_name, html):
"""Return the content of the first tag with the specified class in the passed HTML document"""
retval = get_elements_by_class(class_name, html)
return retval[0] if retval else None
+def get_element_html_by_class(class_name, html):
+ """Return the html of the first tag with the specified class in the passed HTML document"""
+ retval = get_elements_html_by_class(class_name, html)
+ return retval[0] if retval else None
+
+
def get_element_by_attribute(attribute, value, html, escape_value=True):
retval = get_elements_by_attribute(attribute, value, html, escape_value)
return retval[0] if retval else None
+def get_element_html_by_attribute(attribute, value, html, escape_value=True):
+ retval = get_elements_html_by_attribute(attribute, value, html, escape_value)
+ return retval[0] if retval else None
+
+
def get_elements_by_class(class_name, html):
"""Return the content of all tags with the specified class in the passed HTML document as a list"""
return get_elements_by_attribute(
@@ -433,29 +401,123 @@ def get_elements_by_class(class_name, html):
html, escape_value=False)
-def get_elements_by_attribute(attribute, value, html, escape_value=True):
+def get_elements_html_by_class(class_name, html):
+ """Return the html of all tags with the specified class in the passed HTML document as a list"""
+ return get_elements_html_by_attribute(
+ 'class', r'[^\'"]*\b%s\b[^\'"]*' % re.escape(class_name),
+ html, escape_value=False)
+
+
+def get_elements_by_attribute(*args, **kwargs):
"""Return the content of the tag with the specified attribute in the passed HTML document"""
+ return [content for content, _ in get_elements_text_and_html_by_attribute(*args, **kwargs)]
+
+
+def get_elements_html_by_attribute(*args, **kwargs):
+ """Return the html of the tag with the specified attribute in the passed HTML document"""
+ return [whole for _, whole in get_elements_text_and_html_by_attribute(*args, **kwargs)]
+
+
+def get_elements_text_and_html_by_attribute(attribute, value, html, escape_value=True):
+ """
+ Return the text (content) and the html (whole) of the tag with the specified
+ attribute in the passed HTML document
+ """
+
+ quote = '' if re.match(r'''[\s"'`=<>]''', value) else '?'
value = re.escape(value) if escape_value else value
- retlist = []
- for m in re.finditer(r'''(?xs)
- <([a-zA-Z0-9:._-]+)
- (?:\s+[a-zA-Z0-9:._-]+(?:=[a-zA-Z0-9:._-]*|="[^"]*"|='[^']*'|))*?
- \s+%s=['"]?%s['"]?
- (?:\s+[a-zA-Z0-9:._-]+(?:=[a-zA-Z0-9:._-]*|="[^"]*"|='[^']*'|))*?
- \s*>
- (?P.*?)
- \1>
- ''' % (re.escape(attribute), value), html):
- res = m.group('content')
+ partial_element_re = rf'''(?x)
+ <(?P[a-zA-Z0-9:._-]+)
+ (?:\s(?:[^>"']|"[^"]*"|'[^']*')*)?
+ \s{re.escape(attribute)}\s*=\s*(?P<_q>['"]{quote})(?-x:{value})(?P=_q)
+ '''
+
+ for m in re.finditer(partial_element_re, html):
+ content, whole = get_element_text_and_html_by_tag(m.group('tag'), html[m.start():])
+
+ yield (
+ unescapeHTML(re.sub(r'^(?P["\'])(?P.*)(?P=q)$', r'\g', content, flags=re.DOTALL)),
+ whole
+ )
+
+
+class HTMLBreakOnClosingTagParser(compat_HTMLParser):
+ """
+ HTML parser which raises HTMLBreakOnClosingTagException upon reaching the
+ closing tag for the first opening tag it has encountered, and can be used
+ as a context manager
+ """
+
+ class HTMLBreakOnClosingTagException(Exception):
+ pass
+
+ def __init__(self):
+ self.tagstack = collections.deque()
+ compat_HTMLParser.__init__(self)
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *_):
+ self.close()
- if res.startswith('"') or res.startswith("'"):
- res = res[1:-1]
+ def close(self):
+ # handle_endtag does not return upon raising HTMLBreakOnClosingTagException,
+ # so data remains buffered; we no longer have any interest in it, thus
+ # override this method to discard it
+ pass
- retlist.append(unescapeHTML(res))
+ def handle_starttag(self, tag, _):
+ self.tagstack.append(tag)
- return retlist
+ def handle_endtag(self, tag):
+ if not self.tagstack:
+ raise compat_HTMLParseError('no tags in the stack')
+ while self.tagstack:
+ inner_tag = self.tagstack.pop()
+ if inner_tag == tag:
+ break
+ else:
+ raise compat_HTMLParseError(f'matching opening tag for closing {tag} tag not found')
+ if not self.tagstack:
+ raise self.HTMLBreakOnClosingTagException()
+
+
+def get_element_text_and_html_by_tag(tag, html):
+ """
+ For the first element with the specified tag in the passed HTML document
+ return its' content (text) and the whole element (html)
+ """
+ def find_or_raise(haystack, needle, exc):
+ try:
+ return haystack.index(needle)
+ except ValueError:
+ raise exc
+ closing_tag = f'{tag}>'
+ whole_start = find_or_raise(
+ html, f'<{tag}', compat_HTMLParseError(f'opening {tag} tag not found'))
+ content_start = find_or_raise(
+ html[whole_start:], '>', compat_HTMLParseError(f'malformed opening {tag} tag'))
+ content_start += whole_start + 1
+ with HTMLBreakOnClosingTagParser() as parser:
+ parser.feed(html[whole_start:content_start])
+ if not parser.tagstack or parser.tagstack[0] != tag:
+ raise compat_HTMLParseError(f'parser did not match opening {tag} tag')
+ offset = content_start
+ while offset < len(html):
+ next_closing_tag_start = find_or_raise(
+ html[offset:], closing_tag,
+ compat_HTMLParseError(f'closing {tag} tag not found'))
+ next_closing_tag_end = next_closing_tag_start + len(closing_tag)
+ try:
+ parser.feed(html[offset:offset + next_closing_tag_end])
+ offset += next_closing_tag_end
+ except HTMLBreakOnClosingTagParser.HTMLBreakOnClosingTagException:
+ return html[content_start:offset + next_closing_tag_start], \
+ html[whole_start:offset + next_closing_tag_end]
+ raise compat_HTMLParseError('unexpected end of html')
class HTMLAttributeParser(compat_HTMLParser):
@@ -499,16 +561,11 @@ def extract_attributes(html_element):
'empty': '', 'noval': None, 'entity': '&',
'sq': '"', 'dq': '\''
}.
- NB HTMLParser is stricter in Python 2.6 & 3.2 than in later versions,
- but the cases in the unit test will work for all of 2.6, 2.7, 3.2-3.5.
"""
parser = HTMLAttributeParser()
- try:
+ with contextlib.suppress(compat_HTMLParseError):
parser.feed(html_element)
parser.close()
- # Older Python may throw HTMLParseError in case of malformed HTML
- except compat_HTMLParseError:
- pass
return parser.attrs
@@ -527,10 +584,9 @@ def clean_html(html):
if html is None: # Convenience for sanitizing descriptions etc.
return html
- # Newline vs
- html = html.replace('\n', ' ')
- html = re.sub(r'(?u)\s*<\s*br\s*/?\s*>\s*', '\n', html)
- html = re.sub(r'(?u)<\s*/\s*p\s*>\s*<\s*p[^>]*>', '\n', html)
+ html = re.sub(r'\s+', ' ', html)
+ html = re.sub(r'(?u)\s?<\s?br\s?/?\s?>\s?', '\n', html)
+ html = re.sub(r'(?u)<\s?/\s?p\s?>\s?<\s?p[^>]*>', '\n', html)
# Strip html tags
html = re.sub('<.*?>', '', html)
# Replace html entities
@@ -548,26 +604,30 @@ def sanitize_open(filename, open_mode):
It returns the tuple (stream, definitive_file_name).
"""
- try:
- if filename == '-':
- if sys.platform == 'win32':
- import msvcrt
- msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
- return (sys.stdout.buffer if hasattr(sys.stdout, 'buffer') else sys.stdout, filename)
- stream = open(encodeFilename(filename), open_mode)
- return (stream, filename)
- except (IOError, OSError) as err:
- if err.errno in (errno.EACCES,):
- raise
+ if filename == '-':
+ if sys.platform == 'win32':
+ import msvcrt
+ msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
+ return (sys.stdout.buffer if hasattr(sys.stdout, 'buffer') else sys.stdout, filename)
- # In case of error, try to remove win32 forbidden chars
- alt_filename = sanitize_path(filename)
- if alt_filename == filename:
- raise
- else:
- # An exception here should be caught in the caller
- stream = open(encodeFilename(alt_filename), open_mode)
- return (stream, alt_filename)
+ for attempt in range(2):
+ try:
+ try:
+ if sys.platform == 'win32':
+ # FIXME: An exclusive lock also locks the file from being read.
+ # Since windows locks are mandatory, don't lock the file on windows (for now).
+ # Ref: https://github.com/yt-dlp/yt-dlp/issues/3124
+ raise LockingUnsupportedError()
+ stream = locked_file(filename, open_mode, block=False).__enter__()
+ except LockingUnsupportedError:
+ stream = open(filename, open_mode)
+ return (stream, filename)
+ except OSError as err:
+ if attempt or err.errno in (errno.EACCES,):
+ raise
+ old_filename, filename = filename, sanitize_path(filename)
+ if old_filename == filename:
+ raise
def timeconvert(timestr):
@@ -579,36 +639,40 @@ def timeconvert(timestr):
return timestamp
-def sanitize_filename(s, restricted=False, is_id=False):
+def sanitize_filename(s, restricted=False, is_id=NO_DEFAULT):
"""Sanitizes a string so it could be used as part of a filename.
- If restricted is set, use a stricter subset of allowed characters.
- Set is_id if this is not an arbitrary string, but an ID that should be kept
- if possible.
+ @param restricted Use a stricter subset of allowed characters
+ @param is_id Whether this is an ID that should be kept unchanged if possible.
+ If unset, yt-dlp's new sanitization rules are in effect
"""
+ if s == '':
+ return ''
+
def replace_insane(char):
if restricted and char in ACCENT_CHARS:
return ACCENT_CHARS[char]
elif not restricted and char == '\n':
- return ' '
+ return '\0 '
elif char == '?' or ord(char) < 32 or ord(char) == 127:
return ''
elif char == '"':
return '' if restricted else '\''
elif char == ':':
- return '_-' if restricted else ' -'
+ return '\0_\0-' if restricted else '\0 \0-'
elif char in '\\/|*<>':
- return '_'
- if restricted and (char in '!&\'()[]{}$;`^,#' or char.isspace()):
- return '_'
- if restricted and ord(char) > 127:
- return '_'
+ return '\0_'
+ if restricted and (char in '!&\'()[]{}$;`^,#' or char.isspace() or ord(char) > 127):
+ return '\0_'
return char
- if s == '':
- return ''
- # Handle timestamps
- s = re.sub(r'[0-9]+(?::[0-9]+)+', lambda m: m.group(0).replace(':', '_'), s)
+ s = re.sub(r'[0-9]+(?::[0-9]+)+', lambda m: m.group(0).replace(':', '_'), s) # Handle timestamps
result = ''.join(map(replace_insane, s))
+ if is_id is NO_DEFAULT:
+ result = re.sub('(\0.)(?:(?=\\1)..)+', r'\1', result) # Remove repeated substitute chars
+ STRIP_RE = '(?:\0.|[ _-])*'
+ result = re.sub(f'^\0.{STRIP_RE}|{STRIP_RE}\0.$', '', result) # Remove substitute chars from start/end
+ result = result.replace('\0', '') or '_'
+
if not is_id:
while '__' in result:
result = result.replace('__', '_')
@@ -629,8 +693,6 @@ def sanitize_path(s, force=False):
if sys.platform == 'win32':
force = False
drive_or_unc, _ = os.path.splitdrive(s)
- if sys.version_info < (2, 7) and not drive_or_unc:
- drive_or_unc, _ = os.path.splitunc(s)
elif force:
drive_or_unc = ''
else:
@@ -644,7 +706,7 @@ def sanitize_path(s, force=False):
for path_part in norm_path]
if drive_or_unc:
sanitized_path.insert(0, drive_or_unc + os.path.sep)
- elif force and s[0] == os.path.sep:
+ elif force and s and s[0] == os.path.sep:
sanitized_path.insert(0, os.path.sep)
return os.path.join(*sanitized_path)
@@ -652,7 +714,9 @@ def sanitize_path(s, force=False):
def sanitize_url(url):
# Prepend protocol-less URLs with `http:` scheme in order to mitigate
# the number of unwanted failures due to missing protocol
- if url.startswith('//'):
+ if url is None:
+ return
+ elif url.startswith('//'):
return 'http:%s' % url
# Fix some common typos seen so far
COMMON_TYPOS = (
@@ -675,8 +739,8 @@ def extract_basic_auth(url):
parts.hostname if parts.port is None
else '%s:%d' % (parts.hostname, parts.port))))
auth_payload = base64.b64encode(
- ('%s:%s' % (parts.username, parts.password or '')).encode('utf-8'))
- return url, 'Basic ' + auth_payload.decode('utf-8')
+ ('%s:%s' % (parts.username, parts.password or '')).encode())
+ return url, f'Basic {auth_payload.decode()}'
def sanitized_Request(url, *args, **kwargs):
@@ -723,10 +787,8 @@ def _htmlentity_transform(entity_with_semicolon):
else:
base = 10
# See https://github.com/ytdl-org/youtube-dl/issues/7518
- try:
+ with contextlib.suppress(ValueError):
return compat_chr(int(numstr, base))
- except ValueError:
- pass
# Unknown entity in name, return its literal representation
return '&%s;' % entity
@@ -735,7 +797,7 @@ def _htmlentity_transform(entity_with_semicolon):
def unescapeHTML(s):
if s is None:
return None
- assert type(s) == compat_str
+ assert isinstance(s, str)
return re.sub(
r'&([^&;]+;)', lambda m: _htmlentity_transform(m.group(1)), s)
@@ -769,7 +831,7 @@ class Popen(subprocess.Popen):
_startupinfo = None
def __init__(self, *args, **kwargs):
- super(Popen, self).__init__(*args, **kwargs, startupinfo=self._startupinfo)
+ super().__init__(*args, **kwargs, startupinfo=self._startupinfo)
def communicate_or_kill(self, *args, **kwargs):
return process_communicate_or_kill(self, *args, **kwargs)
@@ -788,51 +850,23 @@ def get_subprocess_encoding():
def encodeFilename(s, for_subprocess=False):
- """
- @param s The name of the file
- """
-
- assert type(s) == compat_str
-
- # Python 3 has a Unicode API
- if sys.version_info >= (3, 0):
- return s
-
- # Pass '' directly to use Unicode APIs on Windows 2000 and up
- # (Detecting Windows NT 4 is tricky because 'major >= 4' would
- # match Windows 9x series as well. Besides, NT 4 is obsolete.)
- if not for_subprocess and sys.platform == 'win32' and sys.getwindowsversion()[0] >= 5:
- return s
-
- # Jython assumes filenames are Unicode strings though reported as Python 2.x compatible
- if sys.platform.startswith('java'):
- return s
-
- return s.encode(get_subprocess_encoding(), 'ignore')
+ assert isinstance(s, str)
+ return s
def decodeFilename(b, for_subprocess=False):
-
- if sys.version_info >= (3, 0):
- return b
-
- if not isinstance(b, bytes):
- return b
-
- return b.decode(get_subprocess_encoding(), 'ignore')
+ return b
def encodeArgument(s):
- if not isinstance(s, compat_str):
- # Legacy code that uses byte strings
- # Uncomment the following line after fixing all post processors
- # assert False, 'Internal error: %r should be of type %r, is %r' % (s, compat_str, type(s))
- s = s.decode('ascii')
- return encodeFilename(s, True)
+ # Legacy code that uses byte strings
+ # Uncomment the following line after fixing all post processors
+ # assert isinstance(s, str), 'Internal error: %r should be of type %r, is %r' % (s, compat_str, type(s))
+ return s if isinstance(s, str) else s.decode('ascii')
def decodeArgument(b):
- return decodeFilename(b, True)
+ return b
def decodeOption(optval):
@@ -875,43 +909,49 @@ def _ssl_load_windows_store_certs(ssl_context, storename):
except PermissionError:
return
for cert in certs:
- try:
+ with contextlib.suppress(ssl.SSLError):
ssl_context.load_verify_locations(cadata=cert)
- except ssl.SSLError:
- pass
def make_HTTPS_handler(params, **kwargs):
opts_check_certificate = not params.get('nocheckcertificate')
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.check_hostname = opts_check_certificate
+ if params.get('legacyserverconnect'):
+ context.options |= 4 # SSL_OP_LEGACY_SERVER_CONNECT
+ # Allow use of weaker ciphers in Python 3.10+. See https://bugs.python.org/issue43998
+ context.set_ciphers('DEFAULT')
context.verify_mode = ssl.CERT_REQUIRED if opts_check_certificate else ssl.CERT_NONE
if opts_check_certificate:
+ if has_certifi and 'no-certifi' not in params.get('compat_opts', []):
+ context.load_verify_locations(cafile=certifi.where())
+ else:
+ try:
+ context.load_default_certs()
+ # Work around the issue in load_default_certs when there are bad certificates. See:
+ # https://github.com/yt-dlp/yt-dlp/issues/1060,
+ # https://bugs.python.org/issue35665, https://bugs.python.org/issue45312
+ except ssl.SSLError:
+ # enum_certificates is not present in mingw python. See https://github.com/yt-dlp/yt-dlp/issues/1151
+ if sys.platform == 'win32' and hasattr(ssl, 'enum_certificates'):
+ for storename in ('CA', 'ROOT'):
+ _ssl_load_windows_store_certs(context, storename)
+ context.set_default_verify_paths()
+ client_certfile = params.get('client_certificate')
+ if client_certfile:
try:
- context.load_default_certs()
- # Work around the issue in load_default_certs when there are bad certificates. See:
- # https://github.com/yt-dlp/yt-dlp/issues/1060,
- # https://bugs.python.org/issue35665, https://bugs.python.org/issue45312
+ context.load_cert_chain(
+ client_certfile, keyfile=params.get('client_certificate_key'),
+ password=params.get('client_certificate_password'))
except ssl.SSLError:
- # enum_certificates is not present in mingw python. See https://github.com/yt-dlp/yt-dlp/issues/1151
- if sys.platform == 'win32' and hasattr(ssl, 'enum_certificates'):
- # Create a new context to discard any certificates that were already loaded
- context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
- context.check_hostname, context.verify_mode = True, ssl.CERT_REQUIRED
- for storename in ('CA', 'ROOT'):
- _ssl_load_windows_store_certs(context, storename)
- context.set_default_verify_paths()
+ raise YoutubeDLError('Unable to load client certificate')
return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
def bug_reports_message(before=';'):
- if ytdl_is_updateable():
- update_cmd = 'type yt-dlp -U to update'
- else:
- update_cmd = 'see https://github.com/yt-dlp/yt-dlp on how to update'
- msg = 'please report this issue on https://github.com/yt-dlp/yt-dlp .'
- msg += ' Make sure you are using the latest version; %s.' % update_cmd
- msg += ' Be sure to call yt-dlp with the --verbose flag and include its complete output.'
+ msg = ('please report this issue on https://github.com/yt-dlp/yt-dlp/issues?q= , '
+ 'filling out the appropriate issue template. '
+ 'Confirm you are on the latest version using yt-dlp -U')
before = before.rstrip()
if not before or before.endswith(('.', '!', '?')):
@@ -948,7 +988,7 @@ def __init__(self, msg, tb=None, expected=False, cause=None, video_id=None, ie=N
if sys.exc_info()[0] in network_exceptions:
expected = True
- self.msg = str(msg)
+ self.orig_msg = str(msg)
self.traceback = tb
self.expected = expected
self.cause = cause
@@ -956,22 +996,23 @@ def __init__(self, msg, tb=None, expected=False, cause=None, video_id=None, ie=N
self.ie = ie
self.exc_info = sys.exc_info() # preserve original exception
- super(ExtractorError, self).__init__(''.join((
+ super().__init__(''.join((
format_field(ie, template='[%s] '),
format_field(video_id, template='%s: '),
- self.msg,
+ msg,
format_field(cause, template=' (caused by %r)'),
'' if expected else bug_reports_message())))
def format_traceback(self):
- if self.traceback is None:
- return None
- return ''.join(traceback.format_tb(self.traceback))
+ return join_nonempty(
+ self.traceback and ''.join(traceback.format_tb(self.traceback)),
+ self.cause and ''.join(traceback.format_exception(None, self.cause, self.cause.__traceback__)[1:]),
+ delim='\n') or None
class UnsupportedError(ExtractorError):
def __init__(self, url):
- super(UnsupportedError, self).__init__(
+ super().__init__(
'Unsupported URL: %s' % url, expected=True)
self.url = url
@@ -990,7 +1031,7 @@ class GeoRestrictedError(ExtractorError):
def __init__(self, msg, countries=None, **kwargs):
kwargs['expected'] = True
- super(GeoRestrictedError, self).__init__(msg, **kwargs)
+ super().__init__(msg, **kwargs)
self.countries = countries
@@ -1004,7 +1045,7 @@ class DownloadError(YoutubeDLError):
def __init__(self, msg, exc_info=None):
""" exc_info, if given, is the original exception that caused the trouble (as returned by sys.exc_info()). """
- super(DownloadError, self).__init__(msg)
+ super().__init__(msg)
self.exc_info = exc_info
@@ -1098,9 +1139,7 @@ class ContentTooShortError(YoutubeDLError):
"""
def __init__(self, downloaded, expected):
- super(ContentTooShortError, self).__init__(
- 'Downloaded {0} bytes, expected {1} bytes'.format(downloaded, expected)
- )
+ super().__init__(f'Downloaded {downloaded} bytes, expected {expected} bytes')
# Both in bytes
self.downloaded = downloaded
self.expected = expected
@@ -1108,7 +1147,7 @@ def __init__(self, downloaded, expected):
class XAttrMetadataError(YoutubeDLError):
def __init__(self, code=None, msg='Unknown error'):
- super(XAttrMetadataError, self).__init__(msg)
+ super().__init__(msg)
self.code = code
self.msg = msg
@@ -1127,12 +1166,7 @@ class XAttrUnavailableError(YoutubeDLError):
def _create_http_connection(ydl_handler, http_class, is_https, *args, **kwargs):
- # Working around python 2 bug (see http://bugs.python.org/issue17849) by limiting
- # expected HTTP responses to meet HTTP/1.0 or later (see also
- # https://github.com/ytdl-org/youtube-dl/issues/6727)
- if sys.version_info < (3, 0):
- kwargs['strict'] = True
- hc = http_class(*args, **compat_kwargs(kwargs))
+ hc = http_class(*args, **kwargs)
source_address = ydl_handler._params.get('source_address')
if source_address is not None:
@@ -1149,7 +1183,7 @@ def _create_connection(address, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, source_a
ip_addrs = [addr for addr in addrs if addr[0] == af]
if addrs and not ip_addrs:
ip_version = 'v4' if af == socket.AF_INET else 'v6'
- raise socket.error(
+ raise OSError(
"No remote IP%s addresses available for connect, can't use '%s' as source address"
% (ip_version, source_address[0]))
for res in ip_addrs:
@@ -1163,30 +1197,17 @@ def _create_connection(address, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, source_a
sock.connect(sa)
err = None # Explicitly break reference cycle
return sock
- except socket.error as _:
+ except OSError as _:
err = _
if sock is not None:
sock.close()
if err is not None:
raise err
else:
- raise socket.error('getaddrinfo returns an empty list')
+ raise OSError('getaddrinfo returns an empty list')
if hasattr(hc, '_create_connection'):
hc._create_connection = _create_connection
- sa = (source_address, 0)
- if hasattr(hc, 'source_address'): # Python 2.7+
- hc.source_address = sa
- else: # Python 2.6
- def _hc_connect(self, *args, **kwargs):
- sock = _create_connection(
- (self.host, self.port), self.timeout, sa)
- if is_https:
- self.sock = ssl.wrap_socket(
- sock, self.key_file, self.cert_file,
- ssl_version=ssl.PROTOCOL_TLSv1)
- else:
- self.sock = sock
- hc.connect = functools.partial(_hc_connect, hc)
+ hc.source_address = (source_address, 0)
return hc
@@ -1195,7 +1216,7 @@ def handle_youtubedl_headers(headers):
filtered_headers = headers
if 'Youtubedl-no-compression' in filtered_headers:
- filtered_headers = dict((k, v) for k, v in filtered_headers.items() if k.lower() != 'accept-encoding')
+ filtered_headers = {k: v for k, v in filtered_headers.items() if k.lower() != 'accept-encoding'}
del filtered_headers['Youtubedl-no-compression']
return filtered_headers
@@ -1244,6 +1265,12 @@ def deflate(data):
except zlib.error:
return zlib.decompress(data)
+ @staticmethod
+ def brotli(data):
+ if not data:
+ return data
+ return brotli.decompress(data)
+
def http_request(self, req):
# According to RFC 3986, URLs can not contain non-ASCII characters, however this is not
# always respected by websites, some tend to give out URLs with non percent-encoded
@@ -1260,18 +1287,16 @@ def http_request(self, req):
if url != url_escaped:
req = update_Request(req, url=url_escaped)
- for h, v in std_headers.items():
+ for h, v in self._params.get('http_headers', std_headers).items():
# Capitalize is needed because of Python bug 2275: http://bugs.python.org/issue2275
# The dict keys are capitalized because of this bug by urllib
if h.capitalize() not in req.headers:
req.add_header(h, v)
- req.headers = handle_youtubedl_headers(req.headers)
+ if 'Accept-encoding' not in req.headers:
+ req.add_header('Accept-encoding', ', '.join(SUPPORTED_ENCODINGS))
- if sys.version_info < (2, 7) and '#' in req.get_full_url():
- # Python 2.6 is brain-dead when it comes to fragments
- req._Request__original = req._Request__original.partition('#')[0]
- req._Request__r_type = req._Request__r_type.partition('#')[0]
+ req.headers = handle_youtubedl_headers(req.headers)
return req
@@ -1283,14 +1308,14 @@ def http_response(self, req, resp):
gz = gzip.GzipFile(fileobj=io.BytesIO(content), mode='rb')
try:
uncompressed = io.BytesIO(gz.read())
- except IOError as original_ioerror:
+ except OSError as original_ioerror:
# There may be junk add the end of the file
# See http://stackoverflow.com/q/4928560/35070 for details
for i in range(1, 1024):
try:
gz = gzip.GzipFile(fileobj=io.BytesIO(content[:-i]), mode='rb')
uncompressed = io.BytesIO(gz.read())
- except IOError:
+ except OSError:
continue
break
else:
@@ -1304,21 +1329,22 @@ def http_response(self, req, resp):
resp = compat_urllib_request.addinfourl(gz, old_resp.headers, old_resp.url, old_resp.code)
resp.msg = old_resp.msg
del resp.headers['Content-encoding']
+ # brotli
+ if resp.headers.get('Content-encoding', '') == 'br':
+ resp = compat_urllib_request.addinfourl(
+ io.BytesIO(self.brotli(resp.read())), old_resp.headers, old_resp.url, old_resp.code)
+ resp.msg = old_resp.msg
+ del resp.headers['Content-encoding']
# Percent-encode redirect URL of Location HTTP header to satisfy RFC 3986 (see
# https://github.com/ytdl-org/youtube-dl/issues/6457).
if 300 <= resp.code < 400:
location = resp.headers.get('Location')
if location:
# As of RFC 2616 default charset is iso-8859-1 that is respected by python 3
- if sys.version_info >= (3, 0):
- location = location.encode('iso-8859-1').decode('utf-8')
- else:
- location = location.decode('utf-8')
+ location = location.encode('iso-8859-1').decode()
location_escaped = escape_url(location)
if location != location_escaped:
del resp.headers['Location']
- if sys.version_info < (3, 0):
- location_escaped = location_escaped.encode('utf-8')
resp.headers['Location'] = location_escaped
return resp
@@ -1355,7 +1381,7 @@ class SocksConnection(base_class):
def connect(self):
self.sock = sockssocket()
self.sock.setproxy(*proxy_args)
- if type(self.timeout) in (int, float):
+ if isinstance(self.timeout, (int, float)):
self.sock.settimeout(self.timeout)
self.sock.connect((self.host, self.port))
@@ -1389,9 +1415,14 @@ def https_open(self, req):
conn_class = make_socks_conn_class(conn_class, socks_proxy)
del req.headers['Ytdl-socks-proxy']
- return self.do_open(functools.partial(
- _create_http_connection, self, conn_class, True),
- req, **kwargs)
+ try:
+ return self.do_open(
+ functools.partial(_create_http_connection, self, conn_class, True), req, **kwargs)
+ except urllib.error.URLError as e:
+ if (isinstance(e.reason, ssl.SSLError)
+ and getattr(e.reason, 'reason', None) == 'SSLV3_ALERT_HANDSHAKE_FAILURE'):
+ raise YoutubeDLError('SSLV3_ALERT_HANDSHAKE_FAILURE: Try using --legacy-server-connect')
+ raise
class YoutubeDLCookieJar(compat_cookiejar.MozillaCookieJar):
@@ -1410,57 +1441,71 @@ class YoutubeDLCookieJar(compat_cookiejar.MozillaCookieJar):
'CookieFileEntry',
('domain_name', 'include_subdomains', 'path', 'https_only', 'expires_at', 'name', 'value'))
- def save(self, filename=None, ignore_discard=False, ignore_expires=False):
+ def __init__(self, filename=None, *args, **kwargs):
+ super().__init__(None, *args, **kwargs)
+ if self.is_path(filename):
+ filename = os.fspath(filename)
+ self.filename = filename
+
+ @staticmethod
+ def _true_or_false(cndn):
+ return 'TRUE' if cndn else 'FALSE'
+
+ @staticmethod
+ def is_path(file):
+ return isinstance(file, (str, bytes, os.PathLike))
+
+ @contextlib.contextmanager
+ def open(self, file, *, write=False):
+ if self.is_path(file):
+ with open(file, 'w' if write else 'r', encoding='utf-8') as f:
+ yield f
+ else:
+ if write:
+ file.truncate(0)
+ yield file
+
+ def _really_save(self, f, ignore_discard=False, ignore_expires=False):
+ now = time.time()
+ for cookie in self:
+ if (not ignore_discard and cookie.discard
+ or not ignore_expires and cookie.is_expired(now)):
+ continue
+ name, value = cookie.name, cookie.value
+ if value is None:
+ # cookies.txt regards 'Set-Cookie: foo' as a cookie
+ # with no name, whereas http.cookiejar regards it as a
+ # cookie with no value.
+ name, value = '', name
+ f.write('%s\n' % '\t'.join((
+ cookie.domain,
+ self._true_or_false(cookie.domain.startswith('.')),
+ cookie.path,
+ self._true_or_false(cookie.secure),
+ str_or_none(cookie.expires, default=''),
+ name, value
+ )))
+
+ def save(self, filename=None, *args, **kwargs):
"""
Save cookies to a file.
+ Code is taken from CPython 3.6
+ https://github.com/python/cpython/blob/8d999cbf4adea053be6dbb612b9844635c4dfb8e/Lib/http/cookiejar.py#L2091-L2117 """
- Most of the code is taken from CPython 3.8 and slightly adapted
- to support cookie files with UTF-8 in both python 2 and 3.
- """
if filename is None:
if self.filename is not None:
filename = self.filename
else:
raise ValueError(compat_cookiejar.MISSING_FILENAME_TEXT)
- # Store session cookies with `expires` set to 0 instead of an empty
- # string
+ # Store session cookies with `expires` set to 0 instead of an empty string
for cookie in self:
if cookie.expires is None:
cookie.expires = 0
- with io.open(filename, 'w', encoding='utf-8') as f:
+ with self.open(filename, write=True) as f:
f.write(self._HEADER)
- now = time.time()
- for cookie in self:
- if not ignore_discard and cookie.discard:
- continue
- if not ignore_expires and cookie.is_expired(now):
- continue
- if cookie.secure:
- secure = 'TRUE'
- else:
- secure = 'FALSE'
- if cookie.domain.startswith('.'):
- initial_dot = 'TRUE'
- else:
- initial_dot = 'FALSE'
- if cookie.expires is not None:
- expires = compat_str(cookie.expires)
- else:
- expires = ''
- if cookie.value is None:
- # cookies.txt regards 'Set-Cookie: foo' as a cookie
- # with no name, whereas http.cookiejar regards it as a
- # cookie with no value.
- name = ''
- value = cookie.name
- else:
- name = cookie.name
- value = cookie.value
- f.write(
- '\t'.join([cookie.domain, initial_dot, cookie.path,
- secure, expires, name, value]) + '\n')
+ self._really_save(f, *args, **kwargs)
def load(self, filename=None, ignore_discard=False, ignore_expires=False):
"""Load cookies from a file."""
@@ -1485,14 +1530,16 @@ def prepare_line(line):
return line
cf = io.StringIO()
- with io.open(filename, encoding='utf-8') as f:
+ with self.open(filename) as f:
for line in f:
try:
cf.write(prepare_line(line))
except compat_cookiejar.LoadError as e:
- write_string(
- 'WARNING: skipping cookie file entry due to %s: %r\n'
- % (e, line), sys.stderr)
+ if f'{line.strip()} '[0] in '[{"':
+ raise compat_cookiejar.LoadError(
+ 'Cookies file must be Netscape formatted, not JSON. See '
+ 'https://github.com/ytdl-org/youtube-dl#how-do-i-pass-cookies-to-youtube-dl')
+ write_string(f'WARNING: skipping cookie file entry due to {e}: {line!r}\n')
continue
cf.seek(0)
self._really_load(cf, filename, ignore_discard, ignore_expires)
@@ -1517,19 +1564,6 @@ def __init__(self, cookiejar=None):
compat_urllib_request.HTTPCookieProcessor.__init__(self, cookiejar)
def http_response(self, request, response):
- # Python 2 will choke on next HTTP request in row if there are non-ASCII
- # characters in Set-Cookie HTTP header of last response (see
- # https://github.com/ytdl-org/youtube-dl/issues/6769).
- # In order to at least prevent crashing we will percent encode Set-Cookie
- # header before HTTPCookieProcessor starts processing it.
- # if sys.version_info < (3, 0) and response.headers:
- # for set_cookie_header in ('Set-Cookie', 'Set-Cookie2'):
- # set_cookie = response.headers.get(set_cookie_header)
- # if set_cookie:
- # set_cookie_escaped = compat_urllib_parse.quote(set_cookie, b"%/;:@&=+$,!~*'()?#[] ")
- # if set_cookie != set_cookie_escaped:
- # del response.headers[set_cookie_header]
- # response.headers[set_cookie_header] = set_cookie_escaped
return compat_urllib_request.HTTPCookieProcessor.http_response(self, request, response)
https_request = compat_urllib_request.HTTPCookieProcessor.http_request
@@ -1573,12 +1607,6 @@ def redirect_request(self, req, fp, code, msg, headers, newurl):
# essentially all clients do redirect in this case, so we do
# the same.
- # On python 2 urlh.geturl() may sometimes return redirect URL
- # as byte string instead of unicode. This workaround allows
- # to force it always return unicode.
- if sys.version_info[0] < 3:
- newurl = compat_str(newurl)
-
# Be conciliant with URIs containing a space. This is mainly
# redundant with the more complete encoding done in http_error_302(),
# but it is kept for compatibility with other callers.
@@ -1586,11 +1614,22 @@ def redirect_request(self, req, fp, code, msg, headers, newurl):
CONTENT_HEADERS = ("content-length", "content-type")
# NB: don't use dict comprehension for python 2.6 compatibility
- newheaders = dict((k, v) for k, v in req.headers.items()
- if k.lower() not in CONTENT_HEADERS)
+ newheaders = {k: v for k, v in req.headers.items() if k.lower() not in CONTENT_HEADERS}
+
+ # A 303 must either use GET or HEAD for subsequent request
+ # https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.4
+ if code == 303 and m != 'HEAD':
+ m = 'GET'
+ # 301 and 302 redirects are commonly turned into a GET from a POST
+ # for subsequent requests by browsers, so we'll do the same.
+ # https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.2
+ # https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.3
+ if code in (301, 302) and m == 'POST':
+ m = 'GET'
+
return compat_urllib_request.Request(
newurl, headers=newheaders, origin_req_host=req.origin_req_host,
- unverifiable=True)
+ unverifiable=True, method=m)
def extract_timezone(date_str):
@@ -1630,12 +1669,10 @@ def parse_iso8601(date_str, delimiter='T', timezone=None):
if timezone is None:
timezone, date_str = extract_timezone(date_str)
- try:
- date_format = '%Y-%m-%d{0}%H:%M:%S'.format(delimiter)
+ with contextlib.suppress(ValueError):
+ date_format = f'%Y-%m-%d{delimiter}%H:%M:%S'
dt = datetime.datetime.strptime(date_str, date_format) - timezone
return calendar.timegm(dt.timetuple())
- except ValueError:
- pass
def date_formats(day_first=True):
@@ -1655,17 +1692,13 @@ def unified_strdate(date_str, day_first=True):
_, date_str = extract_timezone(date_str)
for expression in date_formats(day_first):
- try:
+ with contextlib.suppress(ValueError):
upload_date = datetime.datetime.strptime(date_str, expression).strftime('%Y%m%d')
- except ValueError:
- pass
if upload_date is None:
timetuple = email.utils.parsedate_tz(date_str)
if timetuple:
- try:
+ with contextlib.suppress(ValueError):
upload_date = datetime.datetime(*timetuple[:6]).strftime('%Y%m%d')
- except ValueError:
- pass
if upload_date is not None:
return compat_str(upload_date)
@@ -1693,11 +1726,9 @@ def unified_timestamp(date_str, day_first=True):
date_str = m.group(1)
for expression in date_formats(day_first):
- try:
+ with contextlib.suppress(ValueError):
dt = datetime.datetime.strptime(date_str, expression) - timezone + datetime.timedelta(hours=pm_delta)
return calendar.timegm(dt.timetuple())
- except ValueError:
- pass
timetuple = email.utils.parsedate_tz(date_str)
if timetuple:
return calendar.timegm(timetuple) + pm_delta * 3600
@@ -1721,26 +1752,26 @@ def subtitles_filename(filename, sub_lang, sub_format, expected_real_ext=None):
def datetime_from_str(date_str, precision='auto', format='%Y%m%d'):
- """
- Return a datetime object from a string in the format YYYYMMDD or
- (now|today|date)[+-][0-9](microsecond|second|minute|hour|day|week|month|year)(s)?
-
- format: string date format used to return datetime object from
- precision: round the time portion of a datetime object.
- auto|microsecond|second|minute|hour|day.
- auto: round to the unit provided in date_str (if applicable).
+ R"""
+ Return a datetime object from a string.
+ Supported format:
+ (now|today|yesterday|DATE)([+-]\d+(microsecond|second|minute|hour|day|week|month|year)s?)?
+
+ @param format strftime format of DATE
+ @param precision Round the datetime object: auto|microsecond|second|minute|hour|day
+ auto: round to the unit provided in date_str (if applicable).
"""
auto_precision = False
if precision == 'auto':
auto_precision = True
precision = 'microsecond'
- today = datetime_round(datetime.datetime.now(), precision)
+ today = datetime_round(datetime.datetime.utcnow(), precision)
if date_str in ('now', 'today'):
return today
if date_str == 'yesterday':
return today - datetime.timedelta(days=1)
match = re.match(
- r'(?P.+)(?P[+-])(?P