X-Git-Url: https://jfr.im/git/yt-dlp.git/blobdiff_plain/4390d5ec12349e5b5bba30af6b4e7f08678af41a..7e88d7d78f452ea69f06bbdf23f82e9ad7c3de5e:/yt_dlp/utils.py
diff --git a/yt_dlp/utils.py b/yt_dlp/utils.py
index f6e41f837..f21d70672 100644
--- a/yt_dlp/utils.py
+++ b/yt_dlp/utils.py
@@ -1,9 +1,4 @@
#!/usr/bin/env python3
-# coding: utf-8
-
-from __future__ import unicode_literals
-
-import asyncio
import atexit
import base64
import binascii
@@ -13,10 +8,9 @@
import contextlib
import ctypes
import datetime
-import email.utils
import email.header
+import email.utils
import errno
-import functools
import gzip
import hashlib
import hmac
@@ -26,11 +20,13 @@
import json
import locale
import math
+import mimetypes
import operator
import os
import platform
import random
import re
+import shlex
import socket
import ssl
import subprocess
@@ -38,52 +34,38 @@
import tempfile
import time
import traceback
+import types
+import urllib.parse
import xml.etree.ElementTree
import zlib
-import mimetypes
+from .compat import asyncio, functools # isort: split
from .compat import (
- compat_HTMLParseError,
- compat_HTMLParser,
- compat_HTTPError,
- compat_basestring,
- compat_brotli,
compat_chr,
compat_cookiejar,
- compat_ctypes_WINFUNCTYPE,
compat_etree_fromstring,
compat_expanduser,
compat_html_entities,
compat_html_entities_html5,
+ compat_HTMLParseError,
+ compat_HTMLParser,
compat_http_client,
- compat_integer_types,
- compat_numeric_types,
- compat_kwargs,
+ compat_HTTPError,
compat_os_name,
compat_parse_qs,
- compat_shlex_split,
compat_shlex_quote,
compat_str,
compat_struct_pack,
compat_struct_unpack,
compat_urllib_error,
- compat_urllib_parse,
+ compat_urllib_parse_unquote_plus,
compat_urllib_parse_urlencode,
compat_urllib_parse_urlparse,
- compat_urllib_parse_urlunparse,
- compat_urllib_parse_quote,
- compat_urllib_parse_quote_plus,
- compat_urllib_parse_unquote_plus,
compat_urllib_request,
compat_urlparse,
- compat_websockets,
- compat_xpath,
-)
-
-from .socks import (
- ProxyType,
- sockssocket,
)
+from .dependencies import brotli, certifi, websockets
+from .socks import ProxyType, sockssocket
def register_socks_protocols():
@@ -147,13 +129,12 @@ def random_user_agent():
SUPPORTED_ENCODINGS = [
'gzip', 'deflate'
]
-if compat_brotli:
+if brotli:
SUPPORTED_ENCODINGS.append('br')
std_headers = {
'User-Agent': random_user_agent(),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
- 'Accept-Encoding': ', '.join(SUPPORTED_ENCODINGS),
'Accept-Language': 'en-us,en;q=0.5',
'Sec-Fetch-Mode': 'navigate',
}
@@ -264,7 +245,10 @@ def random_user_agent():
PACKED_CODES_RE = r"}\('(.+)',(\d+),(\d+),'([^']+)'\.split\('\|'\)"
JSON_LD_RE = r'(?is)'
+NUMBER_RE = r'\d+(?:\.\d+)?'
+
+@functools.cache
def preferredencoding():
"""Get preferred encoding.
@@ -283,37 +267,9 @@ def preferredencoding():
def write_json_file(obj, fn):
""" Encode obj as JSON and write it to fn, atomically if possible """
- fn = encodeFilename(fn)
- if sys.version_info < (3, 0) and sys.platform != 'win32':
- encoding = get_filesystem_encoding()
- # os.path.basename returns a bytes object, but NamedTemporaryFile
- # will fail if the filename contains non ascii characters unless we
- # use a unicode object
- path_basename = lambda f: os.path.basename(fn).decode(encoding)
- # the same for os.path.dirname
- path_dirname = lambda f: os.path.dirname(fn).decode(encoding)
- else:
- path_basename = os.path.basename
- path_dirname = os.path.dirname
-
- args = {
- 'suffix': '.tmp',
- 'prefix': path_basename(fn) + '.',
- 'dir': path_dirname(fn),
- 'delete': False,
- }
-
- # In Python 2.x, json.dump expects a bytestream.
- # In Python 3.x, it writes to a character stream
- if sys.version_info < (3, 0):
- args['mode'] = 'wb'
- else:
- args.update({
- 'mode': 'w',
- 'encoding': 'utf-8',
- })
-
- tf = tempfile.NamedTemporaryFile(**compat_kwargs(args))
+ tf = tempfile.NamedTemporaryFile(
+ prefix=f'{os.path.basename(fn)}.', dir=os.path.dirname(fn),
+ suffix='.tmp', delete=False, mode='w', encoding='utf-8')
try:
with tf:
@@ -321,39 +277,24 @@ def write_json_file(obj, fn):
if sys.platform == 'win32':
# Need to remove existing file on Windows, else os.rename raises
# WindowsError or FileExistsError.
- try:
+ with contextlib.suppress(OSError):
os.unlink(fn)
- except OSError:
- pass
- try:
+ with contextlib.suppress(OSError):
mask = os.umask(0)
os.umask(mask)
os.chmod(tf.name, 0o666 & ~mask)
- except OSError:
- pass
os.rename(tf.name, fn)
except Exception:
- try:
+ with contextlib.suppress(OSError):
os.remove(tf.name)
- except OSError:
- pass
raise
-if sys.version_info >= (2, 7):
- def find_xpath_attr(node, xpath, key, val=None):
- """ Find the xpath xpath[@key=val] """
- assert re.match(r'^[a-zA-Z_-]+$', key)
- expr = xpath + ('[@%s]' % key if val is None else "[@%s='%s']" % (key, val))
- return node.find(expr)
-else:
- def find_xpath_attr(node, xpath, key, val=None):
- for f in node.findall(compat_xpath(xpath)):
- if key not in f.attrib:
- continue
- if val is None or f.attrib.get(key) == val:
- return f
- return None
+def find_xpath_attr(node, xpath, key, val=None):
+ """ Find the xpath xpath[@key=val] """
+ assert re.match(r'^[a-zA-Z_-]+$', key)
+ expr = xpath + ('[@%s]' % key if val is None else f"[@{key}='{val}']")
+ return node.find(expr)
# On python2.6 the xml.etree.ElementTree.Element methods don't support
# the namespace parameter
@@ -373,7 +314,7 @@ def xpath_with_ns(path, ns_map):
def xpath_element(node, xpath, name=None, fatal=False, default=NO_DEFAULT):
def _find_xpath(xpath):
- return node.find(compat_xpath(xpath))
+ return node.find(xpath)
if isinstance(xpath, (str, compat_str)):
n = _find_xpath(xpath)
@@ -415,21 +356,21 @@ def xpath_attr(node, xpath, key, name=None, fatal=False, default=NO_DEFAULT):
if default is not NO_DEFAULT:
return default
elif fatal:
- name = '%s[@%s]' % (xpath, key) if name is None else name
+ name = f'{xpath}[@{key}]' if name is None else name
raise ExtractorError('Could not find XML attribute %s' % name)
else:
return None
return n.attrib[key]
-def get_element_by_id(id, html):
+def get_element_by_id(id, html, **kwargs):
"""Return the content of the tag with the specified ID in the passed HTML document"""
- return get_element_by_attribute('id', id, html)
+ return get_element_by_attribute('id', id, html, **kwargs)
-def get_element_html_by_id(id, html):
+def get_element_html_by_id(id, html, **kwargs):
"""Return the html of the tag with the specified ID in the passed HTML document"""
- return get_element_html_by_attribute('id', id, html)
+ return get_element_html_by_attribute('id', id, html, **kwargs)
def get_element_by_class(class_name, html):
@@ -444,27 +385,27 @@ def get_element_html_by_class(class_name, html):
return retval[0] if retval else None
-def get_element_by_attribute(attribute, value, html, escape_value=True):
- retval = get_elements_by_attribute(attribute, value, html, escape_value)
+def get_element_by_attribute(attribute, value, html, **kwargs):
+ retval = get_elements_by_attribute(attribute, value, html, **kwargs)
return retval[0] if retval else None
-def get_element_html_by_attribute(attribute, value, html, escape_value=True):
- retval = get_elements_html_by_attribute(attribute, value, html, escape_value)
+def get_element_html_by_attribute(attribute, value, html, **kargs):
+ retval = get_elements_html_by_attribute(attribute, value, html, **kargs)
return retval[0] if retval else None
-def get_elements_by_class(class_name, html):
+def get_elements_by_class(class_name, html, **kargs):
"""Return the content of all tags with the specified class in the passed HTML document as a list"""
return get_elements_by_attribute(
- 'class', r'[^\'"]*\b%s\b[^\'"]*' % re.escape(class_name),
+ 'class', r'[^\'"]*(?<=[\'"\s])%s(?=[\'"\s])[^\'"]*' % re.escape(class_name),
html, escape_value=False)
def get_elements_html_by_class(class_name, html):
"""Return the html of all tags with the specified class in the passed HTML document as a list"""
return get_elements_html_by_attribute(
- 'class', r'[^\'"]*\b%s\b[^\'"]*' % re.escape(class_name),
+ 'class', r'[^\'"]*(?<=[\'"\s])%s(?=[\'"\s])[^\'"]*' % re.escape(class_name),
html, escape_value=False)
@@ -484,15 +425,15 @@ def get_elements_text_and_html_by_attribute(attribute, value, html, escape_value
attribute in the passed HTML document
"""
- value_quote_optional = '' if re.match(r'''[\s"'`=<>]''', value) else '?'
+ quote = '' if re.match(r'''[\s"'`=<>]''', value) else '?'
value = re.escape(value) if escape_value else value
- partial_element_re = r'''(?x)
+ partial_element_re = rf'''(?x)
<(?P[a-zA-Z0-9:._-]+)
(?:\s(?:[^>"']|"[^"]*"|'[^']*')*)?
- \s%(attribute)s\s*=\s*(?P<_q>['"]%(vqo)s)(?-x:%(value)s)(?P=_q)
- ''' % {'attribute': re.escape(attribute), 'value': value, 'vqo': value_quote_optional}
+ \s{re.escape(attribute)}\s*=\s*(?P<_q>['"]{quote})(?-x:{value})(?P=_q)
+ '''
for m in re.finditer(partial_element_re, html):
content, whole = get_element_text_and_html_by_tag(m.group('tag'), html[m.start():])
@@ -621,16 +562,11 @@ def extract_attributes(html_element):
'empty': '', 'noval': None, 'entity': '&',
'sq': '"', 'dq': '\''
}.
- NB HTMLParser is stricter in Python 2.6 & 3.2 than in later versions,
- but the cases in the unit test will work for all of 2.6, 2.7, 3.2-3.5.
"""
parser = HTMLAttributeParser()
- try:
+ with contextlib.suppress(compat_HTMLParseError):
parser.feed(html_element)
parser.close()
- # Older Python may throw HTMLParseError in case of malformed HTML
- except compat_HTMLParseError:
- pass
return parser.attrs
@@ -659,6 +595,19 @@ def clean_html(html):
return html.strip()
+class LenientJSONDecoder(json.JSONDecoder):
+ def __init__(self, *args, transform_source=None, ignore_extra=False, **kwargs):
+ self.transform_source, self.ignore_extra = transform_source, ignore_extra
+ super().__init__(*args, **kwargs)
+
+ def decode(self, s):
+ if self.transform_source:
+ s = self.transform_source(s)
+ if self.ignore_extra:
+ return self.raw_decode(s.lstrip())[0]
+ return super().decode(s)
+
+
def sanitize_open(filename, open_mode):
"""Try to open the given filename, and slightly tweak it if this fails.
@@ -669,26 +618,30 @@ def sanitize_open(filename, open_mode):
It returns the tuple (stream, definitive_file_name).
"""
- try:
- if filename == '-':
- if sys.platform == 'win32':
- import msvcrt
- msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
- return (sys.stdout.buffer if hasattr(sys.stdout, 'buffer') else sys.stdout, filename)
- stream = locked_file(filename, open_mode, block=False).open()
- return (stream, filename)
- except (IOError, OSError) as err:
- if err.errno in (errno.EACCES,):
- raise
+ if filename == '-':
+ if sys.platform == 'win32':
+ import msvcrt
+ msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
+ return (sys.stdout.buffer if hasattr(sys.stdout, 'buffer') else sys.stdout, filename)
- # In case of error, try to remove win32 forbidden chars
- alt_filename = sanitize_path(filename)
- if alt_filename == filename:
- raise
- else:
- # An exception here should be caught in the caller
- stream = locked_file(filename, open_mode, block=False).open()
- return (stream, alt_filename)
+ for attempt in range(2):
+ try:
+ try:
+ if sys.platform == 'win32':
+ # FIXME: An exclusive lock also locks the file from being read.
+ # Since windows locks are mandatory, don't lock the file on windows (for now).
+ # Ref: https://github.com/yt-dlp/yt-dlp/issues/3124
+ raise LockingUnsupportedError()
+ stream = locked_file(filename, open_mode, block=False).__enter__()
+ except OSError:
+ stream = open(filename, open_mode)
+ return stream, filename
+ except OSError as err:
+ if attempt or err.errno in (errno.EACCES,):
+ raise
+ old_filename, filename = filename, sanitize_path(filename)
+ if old_filename == filename:
+ raise
def timeconvert(timestr):
@@ -700,36 +653,40 @@ def timeconvert(timestr):
return timestamp
-def sanitize_filename(s, restricted=False, is_id=False):
+def sanitize_filename(s, restricted=False, is_id=NO_DEFAULT):
"""Sanitizes a string so it could be used as part of a filename.
- If restricted is set, use a stricter subset of allowed characters.
- Set is_id if this is not an arbitrary string, but an ID that should be kept
- if possible.
+ @param restricted Use a stricter subset of allowed characters
+ @param is_id Whether this is an ID that should be kept unchanged if possible.
+ If unset, yt-dlp's new sanitization rules are in effect
"""
+ if s == '':
+ return ''
+
def replace_insane(char):
if restricted and char in ACCENT_CHARS:
return ACCENT_CHARS[char]
elif not restricted and char == '\n':
- return ' '
+ return '\0 '
elif char == '?' or ord(char) < 32 or ord(char) == 127:
return ''
elif char == '"':
return '' if restricted else '\''
elif char == ':':
- return '_-' if restricted else ' -'
+ return '\0_\0-' if restricted else '\0 \0-'
elif char in '\\/|*<>':
- return '_'
- if restricted and (char in '!&\'()[]{}$;`^,#' or char.isspace()):
- return '_'
- if restricted and ord(char) > 127:
- return '_'
+ return '\0_'
+ if restricted and (char in '!&\'()[]{}$;`^,#' or char.isspace() or ord(char) > 127):
+ return '\0_'
return char
- if s == '':
- return ''
- # Handle timestamps
- s = re.sub(r'[0-9]+(?::[0-9]+)+', lambda m: m.group(0).replace(':', '_'), s)
+ s = re.sub(r'[0-9]+(?::[0-9]+)+', lambda m: m.group(0).replace(':', '_'), s) # Handle timestamps
result = ''.join(map(replace_insane, s))
+ if is_id is NO_DEFAULT:
+ result = re.sub('(\0.)(?:(?=\\1)..)+', r'\1', result) # Remove repeated substitute chars
+ STRIP_RE = '(?:\0.|[ _-])*'
+ result = re.sub(f'^\0.{STRIP_RE}|{STRIP_RE}\0.$', '', result) # Remove substitute chars from start/end
+ result = result.replace('\0', '') or '_'
+
if not is_id:
while '__' in result:
result = result.replace('__', '_')
@@ -750,8 +707,6 @@ def sanitize_path(s, force=False):
if sys.platform == 'win32':
force = False
drive_or_unc, _ = os.path.splitdrive(s)
- if sys.version_info < (2, 7) and not drive_or_unc:
- drive_or_unc, _ = os.path.splitunc(s)
elif force:
drive_or_unc = ''
else:
@@ -765,7 +720,7 @@ def sanitize_path(s, force=False):
for path_part in norm_path]
if drive_or_unc:
sanitized_path.insert(0, drive_or_unc + os.path.sep)
- elif force and s[0] == os.path.sep:
+ elif force and s and s[0] == os.path.sep:
sanitized_path.insert(0, os.path.sep)
return os.path.join(*sanitized_path)
@@ -773,7 +728,9 @@ def sanitize_path(s, force=False):
def sanitize_url(url):
# Prepend protocol-less URLs with `http:` scheme in order to mitigate
# the number of unwanted failures due to missing protocol
- if url.startswith('//'):
+ if url is None:
+ return
+ elif url.startswith('//'):
return 'http:%s' % url
# Fix some common typos seen so far
COMMON_TYPOS = (
@@ -796,8 +753,8 @@ def extract_basic_auth(url):
parts.hostname if parts.port is None
else '%s:%d' % (parts.hostname, parts.port))))
auth_payload = base64.b64encode(
- ('%s:%s' % (parts.username, parts.password or '')).encode('utf-8'))
- return url, 'Basic ' + auth_payload.decode('utf-8')
+ ('%s:%s' % (parts.username, parts.password or '')).encode())
+ return url, f'Basic {auth_payload.decode()}'
def sanitized_Request(url, *args, **kwargs):
@@ -844,10 +801,8 @@ def _htmlentity_transform(entity_with_semicolon):
else:
base = 10
# See https://github.com/ytdl-org/youtube-dl/issues/7518
- try:
+ with contextlib.suppress(ValueError):
return compat_chr(int(numstr, base))
- except ValueError:
- pass
# Unknown entity in name, return its literal representation
return '&%s;' % entity
@@ -856,7 +811,7 @@ def _htmlentity_transform(entity_with_semicolon):
def unescapeHTML(s):
if s is None:
return None
- assert type(s) == compat_str
+ assert isinstance(s, str)
return re.sub(
r'&([^&;]+;)', lambda m: _htmlentity_transform(m.group(1)), s)
@@ -874,12 +829,9 @@ def escapeHTML(text):
def process_communicate_or_kill(p, *args, **kwargs):
- try:
- return p.communicate(*args, **kwargs)
- except BaseException: # Including KeyboardInterrupt
- p.kill()
- p.wait()
- raise
+ write_string('DeprecationWarning: yt_dlp.utils.process_communicate_or_kill is deprecated '
+ 'and may be removed in a future version. Use yt_dlp.utils.Popen.communicate_or_kill instead')
+ return Popen.communicate_or_kill(p, *args, **kwargs)
class Popen(subprocess.Popen):
@@ -889,11 +841,30 @@ class Popen(subprocess.Popen):
else:
_startupinfo = None
- def __init__(self, *args, **kwargs):
- super(Popen, self).__init__(*args, **kwargs, startupinfo=self._startupinfo)
+ def __init__(self, *args, text=False, **kwargs):
+ if text is True:
+ kwargs['universal_newlines'] = True # For 3.6 compatibility
+ kwargs.setdefault('encoding', 'utf-8')
+ kwargs.setdefault('errors', 'replace')
+ super().__init__(*args, **kwargs, startupinfo=self._startupinfo)
def communicate_or_kill(self, *args, **kwargs):
- return process_communicate_or_kill(self, *args, **kwargs)
+ try:
+ return self.communicate(*args, **kwargs)
+ except BaseException: # Including KeyboardInterrupt
+ self.kill(timeout=None)
+ raise
+
+ def kill(self, *, timeout=0):
+ super().kill()
+ if timeout != 0:
+ self.wait(timeout=timeout)
+
+ @classmethod
+ def run(cls, *args, **kwargs):
+ with cls(*args, **kwargs) as proc:
+ stdout, stderr = proc.communicate_or_kill()
+ return stdout or '', stderr or '', proc.returncode
def get_subprocess_encoding():
@@ -909,51 +880,23 @@ def get_subprocess_encoding():
def encodeFilename(s, for_subprocess=False):
- """
- @param s The name of the file
- """
-
- assert type(s) == compat_str
-
- # Python 3 has a Unicode API
- if sys.version_info >= (3, 0):
- return s
-
- # Pass '' directly to use Unicode APIs on Windows 2000 and up
- # (Detecting Windows NT 4 is tricky because 'major >= 4' would
- # match Windows 9x series as well. Besides, NT 4 is obsolete.)
- if not for_subprocess and sys.platform == 'win32' and sys.getwindowsversion()[0] >= 5:
- return s
-
- # Jython assumes filenames are Unicode strings though reported as Python 2.x compatible
- if sys.platform.startswith('java'):
- return s
-
- return s.encode(get_subprocess_encoding(), 'ignore')
+ assert isinstance(s, str)
+ return s
def decodeFilename(b, for_subprocess=False):
-
- if sys.version_info >= (3, 0):
- return b
-
- if not isinstance(b, bytes):
- return b
-
- return b.decode(get_subprocess_encoding(), 'ignore')
+ return b
def encodeArgument(s):
- if not isinstance(s, compat_str):
- # Legacy code that uses byte strings
- # Uncomment the following line after fixing all post processors
- # assert False, 'Internal error: %r should be of type %r, is %r' % (s, compat_str, type(s))
- s = s.decode('ascii')
- return encodeFilename(s, True)
+ # Legacy code that uses byte strings
+ # Uncomment the following line after fixing all post processors
+ # assert isinstance(s, str), 'Internal error: %r should be of type %r, is %r' % (s, compat_str, type(s))
+ return s if isinstance(s, str) else s.decode('ascii')
def decodeArgument(b):
- return decodeFilename(b, True)
+ return b
def decodeOption(optval):
@@ -996,10 +939,8 @@ def _ssl_load_windows_store_certs(ssl_context, storename):
except PermissionError:
return
for cert in certs:
- try:
+ with contextlib.suppress(ssl.SSLError):
ssl_context.load_verify_locations(cadata=cert)
- except ssl.SSLError:
- pass
def make_HTTPS_handler(params, **kwargs):
@@ -1008,29 +949,47 @@ def make_HTTPS_handler(params, **kwargs):
context.check_hostname = opts_check_certificate
if params.get('legacyserverconnect'):
context.options |= 4 # SSL_OP_LEGACY_SERVER_CONNECT
+ # Allow use of weaker ciphers in Python 3.10+. See https://bugs.python.org/issue43998
+ context.set_ciphers('DEFAULT')
+
context.verify_mode = ssl.CERT_REQUIRED if opts_check_certificate else ssl.CERT_NONE
if opts_check_certificate:
+ if has_certifi and 'no-certifi' not in params.get('compat_opts', []):
+ context.load_verify_locations(cafile=certifi.where())
try:
context.load_default_certs()
- # Work around the issue in load_default_certs when there are bad certificates. See:
- # https://github.com/yt-dlp/yt-dlp/issues/1060,
- # https://bugs.python.org/issue35665, https://bugs.python.org/issue45312
+ # Work around the issue in load_default_certs when there are bad certificates. See:
+ # https://github.com/yt-dlp/yt-dlp/issues/1060,
+ # https://bugs.python.org/issue35665, https://bugs.python.org/issue45312
except ssl.SSLError:
# enum_certificates is not present in mingw python. See https://github.com/yt-dlp/yt-dlp/issues/1151
if sys.platform == 'win32' and hasattr(ssl, 'enum_certificates'):
- # Create a new context to discard any certificates that were already loaded
- context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
- context.check_hostname, context.verify_mode = True, ssl.CERT_REQUIRED
for storename in ('CA', 'ROOT'):
_ssl_load_windows_store_certs(context, storename)
context.set_default_verify_paths()
+
+ client_certfile = params.get('client_certificate')
+ if client_certfile:
+ try:
+ context.load_cert_chain(
+ client_certfile, keyfile=params.get('client_certificate_key'),
+ password=params.get('client_certificate_password'))
+ except ssl.SSLError:
+ raise YoutubeDLError('Unable to load client certificate')
+
+ # Some servers may reject requests if ALPN extension is not sent. See:
+ # https://github.com/python/cpython/issues/85140
+ # https://github.com/yt-dlp/yt-dlp/issues/3878
+ with contextlib.suppress(NotImplementedError):
+ context.set_alpn_protocols(['http/1.1'])
+
return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
def bug_reports_message(before=';'):
- msg = ('please report this issue on https://github.com/yt-dlp/yt-dlp , '
- 'filling out the "Broken site" issue template properly. '
- 'Confirm you are on the latest version using -U')
+ msg = ('please report this issue on https://github.com/yt-dlp/yt-dlp/issues?q= , '
+ 'filling out the appropriate issue template. '
+ 'Confirm you are on the latest version using yt-dlp -U')
before = before.rstrip()
if not before or before.endswith(('.', '!', '?')):
@@ -1075,7 +1034,7 @@ def __init__(self, msg, tb=None, expected=False, cause=None, video_id=None, ie=N
self.ie = ie
self.exc_info = sys.exc_info() # preserve original exception
- super(ExtractorError, self).__init__(''.join((
+ super().__init__(''.join((
format_field(ie, template='[%s] '),
format_field(video_id, template='%s: '),
msg,
@@ -1085,13 +1044,13 @@ def __init__(self, msg, tb=None, expected=False, cause=None, video_id=None, ie=N
def format_traceback(self):
return join_nonempty(
self.traceback and ''.join(traceback.format_tb(self.traceback)),
- self.cause and ''.join(traceback.format_exception(self.cause)[1:]),
+ self.cause and ''.join(traceback.format_exception(None, self.cause, self.cause.__traceback__)[1:]),
delim='\n') or None
class UnsupportedError(ExtractorError):
def __init__(self, url):
- super(UnsupportedError, self).__init__(
+ super().__init__(
'Unsupported URL: %s' % url, expected=True)
self.url = url
@@ -1110,7 +1069,7 @@ class GeoRestrictedError(ExtractorError):
def __init__(self, msg, countries=None, **kwargs):
kwargs['expected'] = True
- super(GeoRestrictedError, self).__init__(msg, **kwargs)
+ super().__init__(msg, **kwargs)
self.countries = countries
@@ -1124,7 +1083,7 @@ class DownloadError(YoutubeDLError):
def __init__(self, msg, exc_info=None):
""" exc_info, if given, is the original exception that caused the trouble (as returned by sys.exc_info()). """
- super(DownloadError, self).__init__(msg)
+ super().__init__(msg)
self.exc_info = exc_info
@@ -1218,9 +1177,7 @@ class ContentTooShortError(YoutubeDLError):
"""
def __init__(self, downloaded, expected):
- super(ContentTooShortError, self).__init__(
- 'Downloaded {0} bytes, expected {1} bytes'.format(downloaded, expected)
- )
+ super().__init__(f'Downloaded {downloaded} bytes, expected {expected} bytes')
# Both in bytes
self.downloaded = downloaded
self.expected = expected
@@ -1228,7 +1185,7 @@ def __init__(self, downloaded, expected):
class XAttrMetadataError(YoutubeDLError):
def __init__(self, code=None, msg='Unknown error'):
- super(XAttrMetadataError, self).__init__(msg)
+ super().__init__(msg)
self.code = code
self.msg = msg
@@ -1247,12 +1204,7 @@ class XAttrUnavailableError(YoutubeDLError):
def _create_http_connection(ydl_handler, http_class, is_https, *args, **kwargs):
- # Working around python 2 bug (see http://bugs.python.org/issue17849) by limiting
- # expected HTTP responses to meet HTTP/1.0 or later (see also
- # https://github.com/ytdl-org/youtube-dl/issues/6727)
- if sys.version_info < (3, 0):
- kwargs['strict'] = True
- hc = http_class(*args, **compat_kwargs(kwargs))
+ hc = http_class(*args, **kwargs)
source_address = ydl_handler._params.get('source_address')
if source_address is not None:
@@ -1269,7 +1221,7 @@ def _create_connection(address, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, source_a
ip_addrs = [addr for addr in addrs if addr[0] == af]
if addrs and not ip_addrs:
ip_version = 'v4' if af == socket.AF_INET else 'v6'
- raise socket.error(
+ raise OSError(
"No remote IP%s addresses available for connect, can't use '%s' as source address"
% (ip_version, source_address[0]))
for res in ip_addrs:
@@ -1283,30 +1235,17 @@ def _create_connection(address, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, source_a
sock.connect(sa)
err = None # Explicitly break reference cycle
return sock
- except socket.error as _:
+ except OSError as _:
err = _
if sock is not None:
sock.close()
if err is not None:
raise err
else:
- raise socket.error('getaddrinfo returns an empty list')
+ raise OSError('getaddrinfo returns an empty list')
if hasattr(hc, '_create_connection'):
hc._create_connection = _create_connection
- sa = (source_address, 0)
- if hasattr(hc, 'source_address'): # Python 2.7+
- hc.source_address = sa
- else: # Python 2.6
- def _hc_connect(self, *args, **kwargs):
- sock = _create_connection(
- (self.host, self.port), self.timeout, sa)
- if is_https:
- self.sock = ssl.wrap_socket(
- sock, self.key_file, self.cert_file,
- ssl_version=ssl.PROTOCOL_TLSv1)
- else:
- self.sock = sock
- hc.connect = functools.partial(_hc_connect, hc)
+ hc.source_address = (source_address, 0)
return hc
@@ -1315,7 +1254,7 @@ def handle_youtubedl_headers(headers):
filtered_headers = headers
if 'Youtubedl-no-compression' in filtered_headers:
- filtered_headers = dict((k, v) for k, v in filtered_headers.items() if k.lower() != 'accept-encoding')
+ filtered_headers = {k: v for k, v in filtered_headers.items() if k.lower() != 'accept-encoding'}
del filtered_headers['Youtubedl-no-compression']
return filtered_headers
@@ -1368,7 +1307,7 @@ def deflate(data):
def brotli(data):
if not data:
return data
- return compat_brotli.decompress(data)
+ return brotli.decompress(data)
def http_request(self, req):
# According to RFC 3986, URLs can not contain non-ASCII characters, however this is not
@@ -1392,12 +1331,10 @@ def http_request(self, req):
if h.capitalize() not in req.headers:
req.add_header(h, v)
- req.headers = handle_youtubedl_headers(req.headers)
+ if 'Accept-encoding' not in req.headers:
+ req.add_header('Accept-encoding', ', '.join(SUPPORTED_ENCODINGS))
- if sys.version_info < (2, 7) and '#' in req.get_full_url():
- # Python 2.6 is brain-dead when it comes to fragments
- req._Request__original = req._Request__original.partition('#')[0]
- req._Request__r_type = req._Request__r_type.partition('#')[0]
+ req.headers = handle_youtubedl_headers(req.headers)
return req
@@ -1409,14 +1346,14 @@ def http_response(self, req, resp):
gz = gzip.GzipFile(fileobj=io.BytesIO(content), mode='rb')
try:
uncompressed = io.BytesIO(gz.read())
- except IOError as original_ioerror:
+ except OSError as original_ioerror:
# There may be junk add the end of the file
# See http://stackoverflow.com/q/4928560/35070 for details
for i in range(1, 1024):
try:
gz = gzip.GzipFile(fileobj=io.BytesIO(content[:-i]), mode='rb')
uncompressed = io.BytesIO(gz.read())
- except IOError:
+ except OSError:
continue
break
else:
@@ -1442,15 +1379,10 @@ def http_response(self, req, resp):
location = resp.headers.get('Location')
if location:
# As of RFC 2616 default charset is iso-8859-1 that is respected by python 3
- if sys.version_info >= (3, 0):
- location = location.encode('iso-8859-1').decode('utf-8')
- else:
- location = location.decode('utf-8')
+ location = location.encode('iso-8859-1').decode()
location_escaped = escape_url(location)
if location != location_escaped:
del resp.headers['Location']
- if sys.version_info < (3, 0):
- location_escaped = location_escaped.encode('utf-8')
resp.headers['Location'] = location_escaped
return resp
@@ -1487,7 +1419,7 @@ class SocksConnection(base_class):
def connect(self):
self.sock = sockssocket()
self.sock.setproxy(*proxy_args)
- if type(self.timeout) in (int, float):
+ if isinstance(self.timeout, (int, float)):
self.sock.settimeout(self.timeout)
self.sock.connect((self.host, self.port))
@@ -1521,9 +1453,14 @@ def https_open(self, req):
conn_class = make_socks_conn_class(conn_class, socks_proxy)
del req.headers['Ytdl-socks-proxy']
- return self.do_open(functools.partial(
- _create_http_connection, self, conn_class, True),
- req, **kwargs)
+ try:
+ return self.do_open(
+ functools.partial(_create_http_connection, self, conn_class, True), req, **kwargs)
+ except urllib.error.URLError as e:
+ if (isinstance(e.reason, ssl.SSLError)
+ and getattr(e.reason, 'reason', None) == 'SSLV3_ALERT_HANDSHAKE_FAILURE'):
+ raise YoutubeDLError('SSLV3_ALERT_HANDSHAKE_FAILURE: Try using --legacy-server-connect')
+ raise
class YoutubeDLCookieJar(compat_cookiejar.MozillaCookieJar):
@@ -1542,57 +1479,71 @@ class YoutubeDLCookieJar(compat_cookiejar.MozillaCookieJar):
'CookieFileEntry',
('domain_name', 'include_subdomains', 'path', 'https_only', 'expires_at', 'name', 'value'))
- def save(self, filename=None, ignore_discard=False, ignore_expires=False):
+ def __init__(self, filename=None, *args, **kwargs):
+ super().__init__(None, *args, **kwargs)
+ if self.is_path(filename):
+ filename = os.fspath(filename)
+ self.filename = filename
+
+ @staticmethod
+ def _true_or_false(cndn):
+ return 'TRUE' if cndn else 'FALSE'
+
+ @staticmethod
+ def is_path(file):
+ return isinstance(file, (str, bytes, os.PathLike))
+
+ @contextlib.contextmanager
+ def open(self, file, *, write=False):
+ if self.is_path(file):
+ with open(file, 'w' if write else 'r', encoding='utf-8') as f:
+ yield f
+ else:
+ if write:
+ file.truncate(0)
+ yield file
+
+ def _really_save(self, f, ignore_discard=False, ignore_expires=False):
+ now = time.time()
+ for cookie in self:
+ if (not ignore_discard and cookie.discard
+ or not ignore_expires and cookie.is_expired(now)):
+ continue
+ name, value = cookie.name, cookie.value
+ if value is None:
+ # cookies.txt regards 'Set-Cookie: foo' as a cookie
+ # with no name, whereas http.cookiejar regards it as a
+ # cookie with no value.
+ name, value = '', name
+ f.write('%s\n' % '\t'.join((
+ cookie.domain,
+ self._true_or_false(cookie.domain.startswith('.')),
+ cookie.path,
+ self._true_or_false(cookie.secure),
+ str_or_none(cookie.expires, default=''),
+ name, value
+ )))
+
+ def save(self, filename=None, *args, **kwargs):
"""
Save cookies to a file.
+ Code is taken from CPython 3.6
+ https://github.com/python/cpython/blob/8d999cbf4adea053be6dbb612b9844635c4dfb8e/Lib/http/cookiejar.py#L2091-L2117 """
- Most of the code is taken from CPython 3.8 and slightly adapted
- to support cookie files with UTF-8 in both python 2 and 3.
- """
if filename is None:
if self.filename is not None:
filename = self.filename
else:
raise ValueError(compat_cookiejar.MISSING_FILENAME_TEXT)
- # Store session cookies with `expires` set to 0 instead of an empty
- # string
+ # Store session cookies with `expires` set to 0 instead of an empty string
for cookie in self:
if cookie.expires is None:
cookie.expires = 0
- with io.open(filename, 'w', encoding='utf-8') as f:
+ with self.open(filename, write=True) as f:
f.write(self._HEADER)
- now = time.time()
- for cookie in self:
- if not ignore_discard and cookie.discard:
- continue
- if not ignore_expires and cookie.is_expired(now):
- continue
- if cookie.secure:
- secure = 'TRUE'
- else:
- secure = 'FALSE'
- if cookie.domain.startswith('.'):
- initial_dot = 'TRUE'
- else:
- initial_dot = 'FALSE'
- if cookie.expires is not None:
- expires = compat_str(cookie.expires)
- else:
- expires = ''
- if cookie.value is None:
- # cookies.txt regards 'Set-Cookie: foo' as a cookie
- # with no name, whereas http.cookiejar regards it as a
- # cookie with no value.
- name = ''
- value = cookie.name
- else:
- name = cookie.name
- value = cookie.value
- f.write(
- '\t'.join([cookie.domain, initial_dot, cookie.path,
- secure, expires, name, value]) + '\n')
+ self._really_save(f, *args, **kwargs)
def load(self, filename=None, ignore_discard=False, ignore_expires=False):
"""Load cookies from a file."""
@@ -1617,14 +1568,16 @@ def prepare_line(line):
return line
cf = io.StringIO()
- with io.open(filename, encoding='utf-8') as f:
+ with self.open(filename) as f:
for line in f:
try:
cf.write(prepare_line(line))
except compat_cookiejar.LoadError as e:
- write_string(
- 'WARNING: skipping cookie file entry due to %s: %r\n'
- % (e, line), sys.stderr)
+ if f'{line.strip()} '[0] in '[{"':
+ raise compat_cookiejar.LoadError(
+ 'Cookies file must be Netscape formatted, not JSON. See '
+ 'https://github.com/ytdl-org/youtube-dl#how-do-i-pass-cookies-to-youtube-dl')
+ write_string(f'WARNING: skipping cookie file entry due to {e}: {line!r}\n')
continue
cf.seek(0)
self._really_load(cf, filename, ignore_discard, ignore_expires)
@@ -1649,19 +1602,6 @@ def __init__(self, cookiejar=None):
compat_urllib_request.HTTPCookieProcessor.__init__(self, cookiejar)
def http_response(self, request, response):
- # Python 2 will choke on next HTTP request in row if there are non-ASCII
- # characters in Set-Cookie HTTP header of last response (see
- # https://github.com/ytdl-org/youtube-dl/issues/6769).
- # In order to at least prevent crashing we will percent encode Set-Cookie
- # header before HTTPCookieProcessor starts processing it.
- # if sys.version_info < (3, 0) and response.headers:
- # for set_cookie_header in ('Set-Cookie', 'Set-Cookie2'):
- # set_cookie = response.headers.get(set_cookie_header)
- # if set_cookie:
- # set_cookie_escaped = compat_urllib_parse.quote(set_cookie, b"%/;:@&=+$,!~*'()?#[] ")
- # if set_cookie != set_cookie_escaped:
- # del response.headers[set_cookie_header]
- # response.headers[set_cookie_header] = set_cookie_escaped
return compat_urllib_request.HTTPCookieProcessor.http_response(self, request, response)
https_request = compat_urllib_request.HTTPCookieProcessor.http_request
@@ -1705,12 +1645,6 @@ def redirect_request(self, req, fp, code, msg, headers, newurl):
# essentially all clients do redirect in this case, so we do
# the same.
- # On python 2 urlh.geturl() may sometimes return redirect URL
- # as byte string instead of unicode. This workaround allows
- # to force it always return unicode.
- if sys.version_info[0] < 3:
- newurl = compat_str(newurl)
-
# Be conciliant with URIs containing a space. This is mainly
# redundant with the more complete encoding done in http_error_302(),
# but it is kept for compatibility with other callers.
@@ -1718,11 +1652,22 @@ def redirect_request(self, req, fp, code, msg, headers, newurl):
CONTENT_HEADERS = ("content-length", "content-type")
# NB: don't use dict comprehension for python 2.6 compatibility
- newheaders = dict((k, v) for k, v in req.headers.items()
- if k.lower() not in CONTENT_HEADERS)
+ newheaders = {k: v for k, v in req.headers.items() if k.lower() not in CONTENT_HEADERS}
+
+ # A 303 must either use GET or HEAD for subsequent request
+ # https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.4
+ if code == 303 and m != 'HEAD':
+ m = 'GET'
+ # 301 and 302 redirects are commonly turned into a GET from a POST
+ # for subsequent requests by browsers, so we'll do the same.
+ # https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.2
+ # https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.3
+ if code in (301, 302) and m == 'POST':
+ m = 'GET'
+
return compat_urllib_request.Request(
newurl, headers=newheaders, origin_req_host=req.origin_req_host,
- unverifiable=True)
+ unverifiable=True, method=m)
def extract_timezone(date_str):
@@ -1762,12 +1707,10 @@ def parse_iso8601(date_str, delimiter='T', timezone=None):
if timezone is None:
timezone, date_str = extract_timezone(date_str)
- try:
- date_format = '%Y-%m-%d{0}%H:%M:%S'.format(delimiter)
+ with contextlib.suppress(ValueError):
+ date_format = f'%Y-%m-%d{delimiter}%H:%M:%S'
dt = datetime.datetime.strptime(date_str, date_format) - timezone
return calendar.timegm(dt.timetuple())
- except ValueError:
- pass
def date_formats(day_first=True):
@@ -1787,17 +1730,13 @@ def unified_strdate(date_str, day_first=True):
_, date_str = extract_timezone(date_str)
for expression in date_formats(day_first):
- try:
+ with contextlib.suppress(ValueError):
upload_date = datetime.datetime.strptime(date_str, expression).strftime('%Y%m%d')
- except ValueError:
- pass
if upload_date is None:
timetuple = email.utils.parsedate_tz(date_str)
if timetuple:
- try:
+ with contextlib.suppress(ValueError):
upload_date = datetime.datetime(*timetuple[:6]).strftime('%Y%m%d')
- except ValueError:
- pass
if upload_date is not None:
return compat_str(upload_date)
@@ -1825,11 +1764,9 @@ def unified_timestamp(date_str, day_first=True):
date_str = m.group(1)
for expression in date_formats(day_first):
- try:
+ with contextlib.suppress(ValueError):
dt = datetime.datetime.strptime(date_str, expression) - timezone + datetime.timedelta(hours=pm_delta)
return calendar.timegm(dt.timetuple())
- except ValueError:
- pass
timetuple = email.utils.parsedate_tz(date_str)
if timetuple:
return calendar.timegm(timetuple) + pm_delta * 3600
@@ -1853,14 +1790,14 @@ def subtitles_filename(filename, sub_lang, sub_format, expected_real_ext=None):
def datetime_from_str(date_str, precision='auto', format='%Y%m%d'):
- """
- Return a datetime object from a string in the format YYYYMMDD or
- (now|today|yesterday|date)[+-][0-9](microsecond|second|minute|hour|day|week|month|year)(s)?
-
- format: string date format used to return datetime object from
- precision: round the time portion of a datetime object.
- auto|microsecond|second|minute|hour|day.
- auto: round to the unit provided in date_str (if applicable).
+ R"""
+ Return a datetime object from a string.
+ Supported format:
+ (now|today|yesterday|DATE)([+-]\d+(microsecond|second|minute|hour|day|week|month|year)s?)?
+
+ @param format strftime format of DATE
+ @param precision Round the datetime object: auto|microsecond|second|minute|hour|day
+ auto: round to the unit provided in date_str (if applicable).
"""
auto_precision = False
if precision == 'auto':
@@ -1872,7 +1809,7 @@ def datetime_from_str(date_str, precision='auto', format='%Y%m%d'):
if date_str == 'yesterday':
return today - datetime.timedelta(days=1)
match = re.match(
- r'(?P.+)(?P[+-])(?P