X-Git-Url: https://jfr.im/git/yt-dlp.git/blobdiff_plain/59f943cd5097e9bdbc3cb3e6b5675e43d369341a..7e88d7d78f452ea69f06bbdf23f82e9ad7c3de5e:/yt_dlp/utils.py
diff --git a/yt_dlp/utils.py b/yt_dlp/utils.py
index 7faee62ac..f21d70672 100644
--- a/yt_dlp/utils.py
+++ b/yt_dlp/utils.py
@@ -11,7 +11,6 @@
import email.header
import email.utils
import errno
-import functools
import gzip
import hashlib
import hmac
@@ -35,12 +34,13 @@
import tempfile
import time
import traceback
+import types
import urllib.parse
import xml.etree.ElementTree
import zlib
+from .compat import asyncio, functools # isort: split
from .compat import (
- asyncio,
compat_chr,
compat_cookiejar,
compat_etree_fromstring,
@@ -245,7 +245,10 @@ def random_user_agent():
PACKED_CODES_RE = r"}\('(.+)',(\d+),(\d+),'([^']+)'\.split\('\|'\)"
JSON_LD_RE = r'(?is)'
+NUMBER_RE = r'\d+(?:\.\d+)?'
+
+@functools.cache
def preferredencoding():
"""Get preferred encoding.
@@ -360,14 +363,14 @@ def xpath_attr(node, xpath, key, name=None, fatal=False, default=NO_DEFAULT):
return n.attrib[key]
-def get_element_by_id(id, html):
+def get_element_by_id(id, html, **kwargs):
"""Return the content of the tag with the specified ID in the passed HTML document"""
- return get_element_by_attribute('id', id, html)
+ return get_element_by_attribute('id', id, html, **kwargs)
-def get_element_html_by_id(id, html):
+def get_element_html_by_id(id, html, **kwargs):
"""Return the html of the tag with the specified ID in the passed HTML document"""
- return get_element_html_by_attribute('id', id, html)
+ return get_element_html_by_attribute('id', id, html, **kwargs)
def get_element_by_class(class_name, html):
@@ -382,27 +385,27 @@ def get_element_html_by_class(class_name, html):
return retval[0] if retval else None
-def get_element_by_attribute(attribute, value, html, escape_value=True):
- retval = get_elements_by_attribute(attribute, value, html, escape_value)
+def get_element_by_attribute(attribute, value, html, **kwargs):
+ retval = get_elements_by_attribute(attribute, value, html, **kwargs)
return retval[0] if retval else None
-def get_element_html_by_attribute(attribute, value, html, escape_value=True):
- retval = get_elements_html_by_attribute(attribute, value, html, escape_value)
+def get_element_html_by_attribute(attribute, value, html, **kargs):
+ retval = get_elements_html_by_attribute(attribute, value, html, **kargs)
return retval[0] if retval else None
-def get_elements_by_class(class_name, html):
+def get_elements_by_class(class_name, html, **kargs):
"""Return the content of all tags with the specified class in the passed HTML document as a list"""
return get_elements_by_attribute(
- 'class', r'[^\'"]*\b%s\b[^\'"]*' % re.escape(class_name),
+ 'class', r'[^\'"]*(?<=[\'"\s])%s(?=[\'"\s])[^\'"]*' % re.escape(class_name),
html, escape_value=False)
def get_elements_html_by_class(class_name, html):
"""Return the html of all tags with the specified class in the passed HTML document as a list"""
return get_elements_html_by_attribute(
- 'class', r'[^\'"]*\b%s\b[^\'"]*' % re.escape(class_name),
+ 'class', r'[^\'"]*(?<=[\'"\s])%s(?=[\'"\s])[^\'"]*' % re.escape(class_name),
html, escape_value=False)
@@ -592,6 +595,19 @@ def clean_html(html):
return html.strip()
+class LenientJSONDecoder(json.JSONDecoder):
+ def __init__(self, *args, transform_source=None, ignore_extra=False, **kwargs):
+ self.transform_source, self.ignore_extra = transform_source, ignore_extra
+ super().__init__(*args, **kwargs)
+
+ def decode(self, s):
+ if self.transform_source:
+ s = self.transform_source(s)
+ if self.ignore_extra:
+ return self.raw_decode(s.lstrip())[0]
+ return super().decode(s)
+
+
def sanitize_open(filename, open_mode):
"""Try to open the given filename, and slightly tweak it if this fails.
@@ -617,9 +633,9 @@ def sanitize_open(filename, open_mode):
# Ref: https://github.com/yt-dlp/yt-dlp/issues/3124
raise LockingUnsupportedError()
stream = locked_file(filename, open_mode, block=False).__enter__()
- except LockingUnsupportedError:
+ except OSError:
stream = open(filename, open_mode)
- return (stream, filename)
+ return stream, filename
except OSError as err:
if attempt or err.errno in (errno.EACCES,):
raise
@@ -712,7 +728,9 @@ def sanitize_path(s, force=False):
def sanitize_url(url):
# Prepend protocol-less URLs with `http:` scheme in order to mitigate
# the number of unwanted failures due to missing protocol
- if url.startswith('//'):
+ if url is None:
+ return
+ elif url.startswith('//'):
return 'http:%s' % url
# Fix some common typos seen so far
COMMON_TYPOS = (
@@ -735,8 +753,8 @@ def extract_basic_auth(url):
parts.hostname if parts.port is None
else '%s:%d' % (parts.hostname, parts.port))))
auth_payload = base64.b64encode(
- ('%s:%s' % (parts.username, parts.password or '')).encode('utf-8'))
- return url, 'Basic ' + auth_payload.decode('utf-8')
+ ('%s:%s' % (parts.username, parts.password or '')).encode())
+ return url, f'Basic {auth_payload.decode()}'
def sanitized_Request(url, *args, **kwargs):
@@ -811,12 +829,9 @@ def escapeHTML(text):
def process_communicate_or_kill(p, *args, **kwargs):
- try:
- return p.communicate(*args, **kwargs)
- except BaseException: # Including KeyboardInterrupt
- p.kill()
- p.wait()
- raise
+ write_string('DeprecationWarning: yt_dlp.utils.process_communicate_or_kill is deprecated '
+ 'and may be removed in a future version. Use yt_dlp.utils.Popen.communicate_or_kill instead')
+ return Popen.communicate_or_kill(p, *args, **kwargs)
class Popen(subprocess.Popen):
@@ -826,11 +841,30 @@ class Popen(subprocess.Popen):
else:
_startupinfo = None
- def __init__(self, *args, **kwargs):
+ def __init__(self, *args, text=False, **kwargs):
+ if text is True:
+ kwargs['universal_newlines'] = True # For 3.6 compatibility
+ kwargs.setdefault('encoding', 'utf-8')
+ kwargs.setdefault('errors', 'replace')
super().__init__(*args, **kwargs, startupinfo=self._startupinfo)
def communicate_or_kill(self, *args, **kwargs):
- return process_communicate_or_kill(self, *args, **kwargs)
+ try:
+ return self.communicate(*args, **kwargs)
+ except BaseException: # Including KeyboardInterrupt
+ self.kill(timeout=None)
+ raise
+
+ def kill(self, *, timeout=0):
+ super().kill()
+ if timeout != 0:
+ self.wait(timeout=timeout)
+
+ @classmethod
+ def run(cls, *args, **kwargs):
+ with cls(*args, **kwargs) as proc:
+ stdout, stderr = proc.communicate_or_kill()
+ return stdout or '', stderr or '', proc.returncode
def get_subprocess_encoding():
@@ -915,25 +949,40 @@ def make_HTTPS_handler(params, **kwargs):
context.check_hostname = opts_check_certificate
if params.get('legacyserverconnect'):
context.options |= 4 # SSL_OP_LEGACY_SERVER_CONNECT
+ # Allow use of weaker ciphers in Python 3.10+. See https://bugs.python.org/issue43998
+ context.set_ciphers('DEFAULT')
+
context.verify_mode = ssl.CERT_REQUIRED if opts_check_certificate else ssl.CERT_NONE
if opts_check_certificate:
if has_certifi and 'no-certifi' not in params.get('compat_opts', []):
context.load_verify_locations(cafile=certifi.where())
- else:
- try:
- context.load_default_certs()
- # Work around the issue in load_default_certs when there are bad certificates. See:
- # https://github.com/yt-dlp/yt-dlp/issues/1060,
- # https://bugs.python.org/issue35665, https://bugs.python.org/issue45312
- except ssl.SSLError:
- # enum_certificates is not present in mingw python. See https://github.com/yt-dlp/yt-dlp/issues/1151
- if sys.platform == 'win32' and hasattr(ssl, 'enum_certificates'):
- # Create a new context to discard any certificates that were already loaded
- context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
- context.check_hostname, context.verify_mode = True, ssl.CERT_REQUIRED
- for storename in ('CA', 'ROOT'):
- _ssl_load_windows_store_certs(context, storename)
- context.set_default_verify_paths()
+ try:
+ context.load_default_certs()
+ # Work around the issue in load_default_certs when there are bad certificates. See:
+ # https://github.com/yt-dlp/yt-dlp/issues/1060,
+ # https://bugs.python.org/issue35665, https://bugs.python.org/issue45312
+ except ssl.SSLError:
+ # enum_certificates is not present in mingw python. See https://github.com/yt-dlp/yt-dlp/issues/1151
+ if sys.platform == 'win32' and hasattr(ssl, 'enum_certificates'):
+ for storename in ('CA', 'ROOT'):
+ _ssl_load_windows_store_certs(context, storename)
+ context.set_default_verify_paths()
+
+ client_certfile = params.get('client_certificate')
+ if client_certfile:
+ try:
+ context.load_cert_chain(
+ client_certfile, keyfile=params.get('client_certificate_key'),
+ password=params.get('client_certificate_password'))
+ except ssl.SSLError:
+ raise YoutubeDLError('Unable to load client certificate')
+
+ # Some servers may reject requests if ALPN extension is not sent. See:
+ # https://github.com/python/cpython/issues/85140
+ # https://github.com/yt-dlp/yt-dlp/issues/3878
+ with contextlib.suppress(NotImplementedError):
+ context.set_alpn_protocols(['http/1.1'])
+
return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
@@ -1330,7 +1379,7 @@ def http_response(self, req, resp):
location = resp.headers.get('Location')
if location:
# As of RFC 2616 default charset is iso-8859-1 that is respected by python 3
- location = location.encode('iso-8859-1').decode('utf-8')
+ location = location.encode('iso-8859-1').decode()
location_escaped = escape_url(location)
if location != location_escaped:
del resp.headers['Location']
@@ -1404,9 +1453,14 @@ def https_open(self, req):
conn_class = make_socks_conn_class(conn_class, socks_proxy)
del req.headers['Ytdl-socks-proxy']
- return self.do_open(functools.partial(
- _create_http_connection, self, conn_class, True),
- req, **kwargs)
+ try:
+ return self.do_open(
+ functools.partial(_create_http_connection, self, conn_class, True), req, **kwargs)
+ except urllib.error.URLError as e:
+ if (isinstance(e.reason, ssl.SSLError)
+ and getattr(e.reason, 'reason', None) == 'SSLV3_ALERT_HANDSHAKE_FAILURE'):
+ raise YoutubeDLError('SSLV3_ALERT_HANDSHAKE_FAILURE: Try using --legacy-server-connect')
+ raise
class YoutubeDLCookieJar(compat_cookiejar.MozillaCookieJar):
@@ -1425,57 +1479,71 @@ class YoutubeDLCookieJar(compat_cookiejar.MozillaCookieJar):
'CookieFileEntry',
('domain_name', 'include_subdomains', 'path', 'https_only', 'expires_at', 'name', 'value'))
- def save(self, filename=None, ignore_discard=False, ignore_expires=False):
+ def __init__(self, filename=None, *args, **kwargs):
+ super().__init__(None, *args, **kwargs)
+ if self.is_path(filename):
+ filename = os.fspath(filename)
+ self.filename = filename
+
+ @staticmethod
+ def _true_or_false(cndn):
+ return 'TRUE' if cndn else 'FALSE'
+
+ @staticmethod
+ def is_path(file):
+ return isinstance(file, (str, bytes, os.PathLike))
+
+ @contextlib.contextmanager
+ def open(self, file, *, write=False):
+ if self.is_path(file):
+ with open(file, 'w' if write else 'r', encoding='utf-8') as f:
+ yield f
+ else:
+ if write:
+ file.truncate(0)
+ yield file
+
+ def _really_save(self, f, ignore_discard=False, ignore_expires=False):
+ now = time.time()
+ for cookie in self:
+ if (not ignore_discard and cookie.discard
+ or not ignore_expires and cookie.is_expired(now)):
+ continue
+ name, value = cookie.name, cookie.value
+ if value is None:
+ # cookies.txt regards 'Set-Cookie: foo' as a cookie
+ # with no name, whereas http.cookiejar regards it as a
+ # cookie with no value.
+ name, value = '', name
+ f.write('%s\n' % '\t'.join((
+ cookie.domain,
+ self._true_or_false(cookie.domain.startswith('.')),
+ cookie.path,
+ self._true_or_false(cookie.secure),
+ str_or_none(cookie.expires, default=''),
+ name, value
+ )))
+
+ def save(self, filename=None, *args, **kwargs):
"""
Save cookies to a file.
+ Code is taken from CPython 3.6
+ https://github.com/python/cpython/blob/8d999cbf4adea053be6dbb612b9844635c4dfb8e/Lib/http/cookiejar.py#L2091-L2117 """
- Most of the code is taken from CPython 3.8 and slightly adapted
- to support cookie files with UTF-8 in both python 2 and 3.
- """
if filename is None:
if self.filename is not None:
filename = self.filename
else:
raise ValueError(compat_cookiejar.MISSING_FILENAME_TEXT)
- # Store session cookies with `expires` set to 0 instead of an empty
- # string
+ # Store session cookies with `expires` set to 0 instead of an empty string
for cookie in self:
if cookie.expires is None:
cookie.expires = 0
- with open(filename, 'w', encoding='utf-8') as f:
+ with self.open(filename, write=True) as f:
f.write(self._HEADER)
- now = time.time()
- for cookie in self:
- if not ignore_discard and cookie.discard:
- continue
- if not ignore_expires and cookie.is_expired(now):
- continue
- if cookie.secure:
- secure = 'TRUE'
- else:
- secure = 'FALSE'
- if cookie.domain.startswith('.'):
- initial_dot = 'TRUE'
- else:
- initial_dot = 'FALSE'
- if cookie.expires is not None:
- expires = compat_str(cookie.expires)
- else:
- expires = ''
- if cookie.value is None:
- # cookies.txt regards 'Set-Cookie: foo' as a cookie
- # with no name, whereas http.cookiejar regards it as a
- # cookie with no value.
- name = ''
- value = cookie.name
- else:
- name = cookie.name
- value = cookie.value
- f.write(
- '\t'.join([cookie.domain, initial_dot, cookie.path,
- secure, expires, name, value]) + '\n')
+ self._really_save(f, *args, **kwargs)
def load(self, filename=None, ignore_discard=False, ignore_expires=False):
"""Load cookies from a file."""
@@ -1500,11 +1568,15 @@ def prepare_line(line):
return line
cf = io.StringIO()
- with open(filename, encoding='utf-8') as f:
+ with self.open(filename) as f:
for line in f:
try:
cf.write(prepare_line(line))
except compat_cookiejar.LoadError as e:
+ if f'{line.strip()} '[0] in '[{"':
+ raise compat_cookiejar.LoadError(
+ 'Cookies file must be Netscape formatted, not JSON. See '
+ 'https://github.com/ytdl-org/youtube-dl#how-do-i-pass-cookies-to-youtube-dl')
write_string(f'WARNING: skipping cookie file entry due to {e}: {line!r}\n')
continue
cf.seek(0)
@@ -1581,9 +1653,21 @@ def redirect_request(self, req, fp, code, msg, headers, newurl):
CONTENT_HEADERS = ("content-length", "content-type")
# NB: don't use dict comprehension for python 2.6 compatibility
newheaders = {k: v for k, v in req.headers.items() if k.lower() not in CONTENT_HEADERS}
+
+ # A 303 must either use GET or HEAD for subsequent request
+ # https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.4
+ if code == 303 and m != 'HEAD':
+ m = 'GET'
+ # 301 and 302 redirects are commonly turned into a GET from a POST
+ # for subsequent requests by browsers, so we'll do the same.
+ # https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.2
+ # https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.3
+ if code in (301, 302) and m == 'POST':
+ m = 'GET'
+
return compat_urllib_request.Request(
newurl, headers=newheaders, origin_req_host=req.origin_req_host,
- unverifiable=True)
+ unverifiable=True, method=m)
def extract_timezone(date_str):
@@ -1706,14 +1790,14 @@ def subtitles_filename(filename, sub_lang, sub_format, expected_real_ext=None):
def datetime_from_str(date_str, precision='auto', format='%Y%m%d'):
- """
- Return a datetime object from a string in the format YYYYMMDD or
- (now|today|yesterday|date)[+-][0-9](microsecond|second|minute|hour|day|week|month|year)(s)?
-
- format: string date format used to return datetime object from
- precision: round the time portion of a datetime object.
- auto|microsecond|second|minute|hour|day.
- auto: round to the unit provided in date_str (if applicable).
+ R"""
+ Return a datetime object from a string.
+ Supported format:
+ (now|today|yesterday|DATE)([+-]\d+(microsecond|second|minute|hour|day|week|month|year)s?)?
+
+ @param format strftime format of DATE
+ @param precision Round the datetime object: auto|microsecond|second|minute|hour|day
+ auto: round to the unit provided in date_str (if applicable).
"""
auto_precision = False
if precision == 'auto':
@@ -1725,7 +1809,7 @@ def datetime_from_str(date_str, precision='auto', format='%Y%m%d'):
if date_str == 'yesterday':
return today - datetime.timedelta(days=1)
match = re.match(
- r'(?P.+)(?P[+-])(?P