-import asyncio
-import atexit
import base64
import binascii
import calendar
import collections
import collections.abc
import contextlib
-import datetime
+import datetime as dt
import email.header
import email.utils
import errno
-import gzip
import hashlib
import hmac
import html.entities
import html.parser
-import http.client
-import http.cookiejar
import inspect
import io
import itertools
import locale
import math
import mimetypes
+import netrc
import operator
import os
import platform
import urllib.parse
import urllib.request
import xml.etree.ElementTree
-import zlib
from . import traversal
compat_expanduser,
compat_HTMLParseError,
compat_os_name,
- compat_shlex_quote,
)
-from ..dependencies import brotli, certifi, websockets, xattr
-from ..socks import ProxyType, sockssocket
+from ..dependencies import xattr
-__name__ = __name__.rsplit('.', 1)[0] # Pretend to be the parent module
+__name__ = __name__.rsplit('.', 1)[0] # noqa: A001: Pretend to be the parent module
# This is not clearly defined otherwise
compiled_regex_type = type(re.compile(''))
-def random_user_agent():
- _USER_AGENT_TPL = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/%s Safari/537.36'
- _CHROME_VERSIONS = (
- '90.0.4430.212',
- '90.0.4430.24',
- '90.0.4430.70',
- '90.0.4430.72',
- '90.0.4430.85',
- '90.0.4430.93',
- '91.0.4472.101',
- '91.0.4472.106',
- '91.0.4472.114',
- '91.0.4472.124',
- '91.0.4472.164',
- '91.0.4472.19',
- '91.0.4472.77',
- '92.0.4515.107',
- '92.0.4515.115',
- '92.0.4515.131',
- '92.0.4515.159',
- '92.0.4515.43',
- '93.0.4556.0',
- '93.0.4577.15',
- '93.0.4577.63',
- '93.0.4577.82',
- '94.0.4606.41',
- '94.0.4606.54',
- '94.0.4606.61',
- '94.0.4606.71',
- '94.0.4606.81',
- '94.0.4606.85',
- '95.0.4638.17',
- '95.0.4638.50',
- '95.0.4638.54',
- '95.0.4638.69',
- '95.0.4638.74',
- '96.0.4664.18',
- '96.0.4664.45',
- '96.0.4664.55',
- '96.0.4664.93',
- '97.0.4692.20',
- )
- return _USER_AGENT_TPL % random.choice(_CHROME_VERSIONS)
-
-
-SUPPORTED_ENCODINGS = [
- 'gzip', 'deflate'
-]
-if brotli:
- SUPPORTED_ENCODINGS.append('br')
-
-std_headers = {
- 'User-Agent': random_user_agent(),
- 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
- 'Accept-Language': 'en-us,en;q=0.5',
- 'Sec-Fetch-Mode': 'navigate',
-}
-
-
-USER_AGENTS = {
- 'Safari': 'Mozilla/5.0 (X11; Linux x86_64; rv:10.0) AppleWebKit/533.20.25 (KHTML, like Gecko) Version/5.0.4 Safari/533.20.27',
-}
-
-
class NO_DEFAULT:
pass
'EST': -5, 'EDT': -4, # Eastern
'CST': -6, 'CDT': -5, # Central
'MST': -7, 'MDT': -6, # Mountain
- 'PST': -8, 'PDT': -7 # Pacific
+ 'PST': -8, 'PDT': -7, # Pacific
}
# needed for sanitizing filenames in restricted mode
'%d/%m/%y',
'%d/%m/%Y %H:%M:%S',
'%d-%m-%Y %H:%M',
+ '%H:%M %d/%m/%Y',
])
DATE_FORMATS_MONTH_FIRST = list(DATE_FORMATS)
def find_xpath_attr(node, xpath, key, val=None):
""" Find the xpath xpath[@key=val] """
assert re.match(r'^[a-zA-Z_-]+$', key)
- expr = xpath + ('[@%s]' % key if val is None else f"[@{key}='{val}']")
+ expr = xpath + (f'[@{key}]' if val is None else f"[@{key}='{val}']")
return node.find(expr)
# On python2.6 the xml.etree.ElementTree.Element methods don't support
replaced.append(c[0])
else:
ns, tag = c
- replaced.append('{%s}%s' % (ns_map[ns], tag))
+ replaced.append(f'{{{ns_map[ns]}}}{tag}')
return '/'.join(replaced)
return default
elif fatal:
name = xpath if name is None else name
- raise ExtractorError('Could not find XML element %s' % name)
+ raise ExtractorError(f'Could not find XML element {name}')
else:
return None
return n
return default
elif fatal:
name = xpath if name is None else name
- raise ExtractorError('Could not find XML element\'s text %s' % name)
+ raise ExtractorError(f'Could not find XML element\'s text {name}')
else:
return None
return n.text
return default
elif fatal:
name = f'{xpath}[@{key}]' if name is None else name
- raise ExtractorError('Could not find XML attribute %s' % name)
+ raise ExtractorError(f'Could not find XML attribute {name}')
else:
return None
return n.attrib[key]
def get_elements_by_class(class_name, html, **kargs):
"""Return the content of all tags with the specified class in the passed HTML document as a list"""
return get_elements_by_attribute(
- 'class', r'[^\'"]*(?<=[\'"\s])%s(?=[\'"\s])[^\'"]*' % re.escape(class_name),
+ 'class', rf'[^\'"]*(?<=[\'"\s]){re.escape(class_name)}(?=[\'"\s])[^\'"]*',
html, escape_value=False)
def get_elements_html_by_class(class_name, html):
"""Return the html of all tags with the specified class in the passed HTML document as a list"""
return get_elements_html_by_attribute(
- 'class', r'[^\'"]*(?<=[\'"\s])%s(?=[\'"\s])[^\'"]*' % re.escape(class_name),
+ 'class', rf'[^\'"]*(?<=[\'"\s]){re.escape(class_name)}(?=[\'"\s])[^\'"]*',
html, escape_value=False)
yield (
unescapeHTML(re.sub(r'^(?P<q>["\'])(?P<content>.*)(?P=q)$', r'\g<content>', content, flags=re.DOTALL)),
- whole
+ whole,
)
else:
raise compat_HTMLParseError(f'matching opening tag for closing {tag} tag not found')
if not self.tagstack:
- raise self.HTMLBreakOnClosingTagException()
+ raise self.HTMLBreakOnClosingTagException
# XXX: This should be far less strict
s = self._close_object(e)
if s is not None:
continue
- raise type(e)(f'{e.msg} in {s[e.pos-10:e.pos+10]!r}', s, e.pos)
+ raise type(e)(f'{e.msg} in {s[e.pos - 10:e.pos + 10]!r}', s, e.pos)
assert False, 'Too many attempts to decode JSON'
# FIXME: An exclusive lock also locks the file from being read.
# Since windows locks are mandatory, don't lock the file on windows (for now).
# Ref: https://github.com/yt-dlp/yt-dlp/issues/3124
- raise LockingUnsupportedError()
+ raise LockingUnsupportedError
stream = locked_file(filename, open_mode, block=False).__enter__()
except OSError:
stream = open(filename, open_mode)
elif char in '\\/|*<>':
return '\0_'
if restricted and (char in '!&\'()[]{}$;`^,#' or char.isspace() or ord(char) > 127):
- return '\0_'
+ return '' if unicodedata.category(char)[0] in 'CM' else '\0_'
return char
# Replace look-alike Unicode glyphs
def sanitize_path(s, force=False):
"""Sanitizes and normalizes path on Windows"""
+ # XXX: this handles drive relative paths (c:sth) incorrectly
if sys.platform == 'win32':
force = False
drive_or_unc, _ = os.path.splitdrive(s)
sanitized_path.insert(0, drive_or_unc + os.path.sep)
elif force and s and s[0] == os.path.sep:
sanitized_path.insert(0, os.path.sep)
- return os.path.join(*sanitized_path)
+ # TODO: Fix behavioral differences <3.12
+ # The workaround using `normpath` only superficially passes tests
+ # Ref: https://github.com/python/cpython/pull/100351
+ return os.path.normpath(os.path.join(*sanitized_path))
def sanitize_url(url, *, scheme='http'):
return url, None
url = urllib.parse.urlunsplit(parts._replace(netloc=(
parts.hostname if parts.port is None
- else '%s:%d' % (parts.hostname, parts.port))))
+ else f'{parts.hostname}:{parts.port}')))
auth_payload = base64.b64encode(
- ('%s:%s' % (parts.username, parts.password or '')).encode())
+ ('{}:{}'.format(parts.username, parts.password or '')).encode())
return url, f'Basic {auth_payload.decode()}'
-def sanitized_Request(url, *args, **kwargs):
- url, auth_header = extract_basic_auth(escape_url(sanitize_url(url)))
- if auth_header is not None:
- headers = args[1] if len(args) >= 2 else kwargs.setdefault('headers', {})
- headers['Authorization'] = auth_header
- return urllib.request.Request(url, *args, **kwargs)
-
-
def expand_path(s):
"""Expand shell variables and ~"""
return os.path.expandvars(compat_expanduser(s))
numstr = mobj.group(1)
if numstr.startswith('x'):
base = 16
- numstr = '0%s' % numstr
+ numstr = f'0{numstr}'
else:
base = 10
# See https://github.com/ytdl-org/youtube-dl/issues/7518
return chr(int(numstr, base))
# Unknown entity in name, return its literal representation
- return '&%s;' % entity
+ return f'&{entity};'
def unescapeHTML(s):
)
-def process_communicate_or_kill(p, *args, **kwargs):
- deprecation_warning(f'"{__name__}.process_communicate_or_kill" is deprecated and may be removed '
- f'in a future version. Use "{__name__}.Popen.communicate_or_kill" instead')
- return Popen.communicate_or_kill(p, *args, **kwargs)
+class netrc_from_content(netrc.netrc):
+ def __init__(self, content):
+ self.hosts, self.macros = {}, {}
+ with io.StringIO(content) as stream:
+ self._parse('-', stream, False)
class Popen(subprocess.Popen):
_fix('LD_LIBRARY_PATH') # Linux
_fix('DYLD_LIBRARY_PATH') # macOS
- def __init__(self, *args, env=None, text=False, **kwargs):
+ def __init__(self, args, *remaining, env=None, text=False, shell=False, **kwargs):
if env is None:
env = os.environ.copy()
self._fix_pyinstaller_ld_path(env)
kwargs['universal_newlines'] = True # For 3.6 compatibility
kwargs.setdefault('encoding', 'utf-8')
kwargs.setdefault('errors', 'replace')
- super().__init__(*args, env=env, **kwargs, startupinfo=self._startupinfo)
+
+ if shell and compat_os_name == 'nt' and kwargs.get('executable') is None:
+ if not isinstance(args, str):
+ args = shell_quote(args, shell=True)
+ shell = False
+ # Set variable for `cmd.exe` newline escaping (see `utils.shell_quote`)
+ env['='] = '"^\n\n"'
+ args = f'{self.__comspec()} /Q /S /D /V:OFF /E:ON /C "{args}"'
+
+ super().__init__(args, *remaining, env=env, shell=shell, **kwargs, startupinfo=self._startupinfo)
+
+ def __comspec(self):
+ comspec = os.environ.get('ComSpec') or os.path.join(
+ os.environ.get('SystemRoot', ''), 'System32', 'cmd.exe')
+ if os.path.isabs(comspec):
+ return comspec
+ raise FileNotFoundError('shell not found: neither %ComSpec% nor %SystemRoot% is set')
def communicate_or_kill(self, *args, **kwargs):
try:
return '%s.%03d' % (ret, time.milliseconds) if msec else ret
-def _ssl_load_windows_store_certs(ssl_context, storename):
- # Code adapted from _load_windows_store_certs in https://github.com/python/cpython/blob/main/Lib/ssl.py
- try:
- certs = [cert for cert, encoding, trust in ssl.enum_certificates(storename)
- if encoding == 'x509_asn' and (
- trust is True or ssl.Purpose.SERVER_AUTH.oid in trust)]
- except PermissionError:
- return
- for cert in certs:
- with contextlib.suppress(ssl.SSLError):
- ssl_context.load_verify_locations(cadata=cert)
-
-
-def make_HTTPS_handler(params, **kwargs):
- opts_check_certificate = not params.get('nocheckcertificate')
- context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
- context.check_hostname = opts_check_certificate
- if params.get('legacyserverconnect'):
- context.options |= 4 # SSL_OP_LEGACY_SERVER_CONNECT
- # Allow use of weaker ciphers in Python 3.10+. See https://bugs.python.org/issue43998
- context.set_ciphers('DEFAULT')
- elif (
- sys.version_info < (3, 10)
- and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1)
- and not ssl.OPENSSL_VERSION.startswith('LibreSSL')
- ):
- # Backport the default SSL ciphers and minimum TLS version settings from Python 3.10 [1].
- # This is to ensure consistent behavior across Python versions, and help avoid fingerprinting
- # in some situations [2][3].
- # Python 3.10 only supports OpenSSL 1.1.1+ [4]. Because this change is likely
- # untested on older versions, we only apply this to OpenSSL 1.1.1+ to be safe.
- # LibreSSL is excluded until further investigation due to cipher support issues [5][6].
- # 1. https://github.com/python/cpython/commit/e983252b516edb15d4338b0a47631b59ef1e2536
- # 2. https://github.com/yt-dlp/yt-dlp/issues/4627
- # 3. https://github.com/yt-dlp/yt-dlp/pull/5294
- # 4. https://peps.python.org/pep-0644/
- # 5. https://peps.python.org/pep-0644/#libressl-support
- # 6. https://github.com/yt-dlp/yt-dlp/commit/5b9f253fa0aee996cf1ed30185d4b502e00609c4#commitcomment-89054368
- context.set_ciphers('@SECLEVEL=2:ECDH+AESGCM:ECDH+CHACHA20:ECDH+AES:DHE+AES:!aNULL:!eNULL:!aDSS:!SHA1:!AESCCM')
- context.minimum_version = ssl.TLSVersion.TLSv1_2
-
- context.verify_mode = ssl.CERT_REQUIRED if opts_check_certificate else ssl.CERT_NONE
- if opts_check_certificate:
- if certifi and 'no-certifi' not in params.get('compat_opts', []):
- context.load_verify_locations(cafile=certifi.where())
- else:
- try:
- context.load_default_certs()
- # Work around the issue in load_default_certs when there are bad certificates. See:
- # https://github.com/yt-dlp/yt-dlp/issues/1060,
- # https://bugs.python.org/issue35665, https://bugs.python.org/issue45312
- except ssl.SSLError:
- # enum_certificates is not present in mingw python. See https://github.com/yt-dlp/yt-dlp/issues/1151
- if sys.platform == 'win32' and hasattr(ssl, 'enum_certificates'):
- for storename in ('CA', 'ROOT'):
- _ssl_load_windows_store_certs(context, storename)
- context.set_default_verify_paths()
-
- client_certfile = params.get('client_certificate')
- if client_certfile:
- try:
- context.load_cert_chain(
- client_certfile, keyfile=params.get('client_certificate_key'),
- password=params.get('client_certificate_password'))
- except ssl.SSLError:
- raise YoutubeDLError('Unable to load client certificate')
-
- # Some servers may reject requests if ALPN extension is not sent. See:
- # https://github.com/python/cpython/issues/85140
- # https://github.com/yt-dlp/yt-dlp/issues/3878
- with contextlib.suppress(NotImplementedError):
- context.set_alpn_protocols(['http/1.1'])
-
- return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
-
-
def bug_reports_message(before=';'):
from ..update import REPOSITORY
super().__init__(self.msg)
-network_exceptions = [urllib.error.URLError, http.client.HTTPException, socket.error]
-if hasattr(ssl, 'CertificateError'):
- network_exceptions.append(ssl.CertificateError)
-network_exceptions = tuple(network_exceptions)
-
-
class ExtractorError(YoutubeDLError):
"""Error during info extraction."""
""" tb, if given, is the original traceback (so that it can be printed out).
If expected is set, this is a normal error message and most likely not a bug in yt-dlp.
"""
+ from ..networking.exceptions import network_exceptions
if sys.exc_info()[0] in network_exceptions:
expected = True
class UnsupportedError(ExtractorError):
def __init__(self, url):
super().__init__(
- 'Unsupported URL: %s' % url, expected=True)
+ f'Unsupported URL: {url}', expected=True)
self.url = url
pass
-def _create_http_connection(ydl_handler, http_class, is_https, *args, **kwargs):
- hc = http_class(*args, **kwargs)
- source_address = ydl_handler._params.get('source_address')
-
- if source_address is not None:
- # This is to workaround _create_connection() from socket where it will try all
- # address data from getaddrinfo() including IPv6. This filters the result from
- # getaddrinfo() based on the source_address value.
- # This is based on the cpython socket.create_connection() function.
- # https://github.com/python/cpython/blob/master/Lib/socket.py#L691
- def _create_connection(address, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, source_address=None):
- host, port = address
- err = None
- addrs = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)
- af = socket.AF_INET if '.' in source_address[0] else socket.AF_INET6
- ip_addrs = [addr for addr in addrs if addr[0] == af]
- if addrs and not ip_addrs:
- ip_version = 'v4' if af == socket.AF_INET else 'v6'
- raise OSError(
- "No remote IP%s addresses available for connect, can't use '%s' as source address"
- % (ip_version, source_address[0]))
- for res in ip_addrs:
- af, socktype, proto, canonname, sa = res
- sock = None
- try:
- sock = socket.socket(af, socktype, proto)
- if timeout is not socket._GLOBAL_DEFAULT_TIMEOUT:
- sock.settimeout(timeout)
- sock.bind(source_address)
- sock.connect(sa)
- err = None # Explicitly break reference cycle
- return sock
- except OSError as _:
- err = _
- if sock is not None:
- sock.close()
- if err is not None:
- raise err
- else:
- raise OSError('getaddrinfo returns an empty list')
- if hasattr(hc, '_create_connection'):
- hc._create_connection = _create_connection
- hc.source_address = (source_address, 0)
-
- return hc
-
-
-class YoutubeDLHandler(urllib.request.HTTPHandler):
- """Handler for HTTP requests and responses.
-
- This class, when installed with an OpenerDirector, automatically adds
- the standard headers to every HTTP request and handles gzipped, deflated and
- brotli responses from web servers.
-
- Part of this code was copied from:
-
- http://techknack.net/python-urllib2-handlers/
-
- Andrew Rowls, the author of that code, agreed to release it to the
- public domain.
- """
-
- def __init__(self, params, *args, **kwargs):
- urllib.request.HTTPHandler.__init__(self, *args, **kwargs)
- self._params = params
-
- def http_open(self, req):
- conn_class = http.client.HTTPConnection
-
- socks_proxy = req.headers.get('Ytdl-socks-proxy')
- if socks_proxy:
- conn_class = make_socks_conn_class(conn_class, socks_proxy)
- del req.headers['Ytdl-socks-proxy']
-
- return self.do_open(functools.partial(
- _create_http_connection, self, conn_class, False),
- req)
-
- @staticmethod
- def deflate(data):
- if not data:
- return data
- try:
- return zlib.decompress(data, -zlib.MAX_WBITS)
- except zlib.error:
- return zlib.decompress(data)
-
- @staticmethod
- def brotli(data):
- if not data:
- return data
- return brotli.decompress(data)
-
- def http_request(self, req):
- # According to RFC 3986, URLs can not contain non-ASCII characters, however this is not
- # always respected by websites, some tend to give out URLs with non percent-encoded
- # non-ASCII characters (see telemb.py, ard.py [#3412])
- # urllib chokes on URLs with non-ASCII characters (see http://bugs.python.org/issue3991)
- # To work around aforementioned issue we will replace request's original URL with
- # percent-encoded one
- # Since redirects are also affected (e.g. http://www.southpark.de/alle-episoden/s18e09)
- # the code of this workaround has been moved here from YoutubeDL.urlopen()
- url = req.get_full_url()
- url_escaped = escape_url(url)
-
- # Substitute URL if any change after escaping
- if url != url_escaped:
- req = update_Request(req, url=url_escaped)
-
- for h, v in self._params.get('http_headers', std_headers).items():
- # Capitalize is needed because of Python bug 2275: http://bugs.python.org/issue2275
- # The dict keys are capitalized because of this bug by urllib
- if h.capitalize() not in req.headers:
- req.add_header(h, v)
-
- if 'Youtubedl-no-compression' in req.headers: # deprecated
- req.headers.pop('Youtubedl-no-compression', None)
- req.add_header('Accept-encoding', 'identity')
-
- if 'Accept-encoding' not in req.headers:
- req.add_header('Accept-encoding', ', '.join(SUPPORTED_ENCODINGS))
-
- return super().do_request_(req)
-
- def http_response(self, req, resp):
- old_resp = resp
- # gzip
- if resp.headers.get('Content-encoding', '') == 'gzip':
- content = resp.read()
- gz = gzip.GzipFile(fileobj=io.BytesIO(content), mode='rb')
- try:
- uncompressed = io.BytesIO(gz.read())
- except OSError as original_ioerror:
- # There may be junk add the end of the file
- # See http://stackoverflow.com/q/4928560/35070 for details
- for i in range(1, 1024):
- try:
- gz = gzip.GzipFile(fileobj=io.BytesIO(content[:-i]), mode='rb')
- uncompressed = io.BytesIO(gz.read())
- except OSError:
- continue
- break
- else:
- raise original_ioerror
- resp = urllib.request.addinfourl(uncompressed, old_resp.headers, old_resp.url, old_resp.code)
- resp.msg = old_resp.msg
- # deflate
- if resp.headers.get('Content-encoding', '') == 'deflate':
- gz = io.BytesIO(self.deflate(resp.read()))
- resp = urllib.request.addinfourl(gz, old_resp.headers, old_resp.url, old_resp.code)
- resp.msg = old_resp.msg
- # brotli
- if resp.headers.get('Content-encoding', '') == 'br':
- resp = urllib.request.addinfourl(
- io.BytesIO(self.brotli(resp.read())), old_resp.headers, old_resp.url, old_resp.code)
- resp.msg = old_resp.msg
- # Percent-encode redirect URL of Location HTTP header to satisfy RFC 3986 (see
- # https://github.com/ytdl-org/youtube-dl/issues/6457).
- if 300 <= resp.code < 400:
- location = resp.headers.get('Location')
- if location:
- # As of RFC 2616 default charset is iso-8859-1 that is respected by python 3
- location = location.encode('iso-8859-1').decode()
- location_escaped = escape_url(location)
- if location != location_escaped:
- del resp.headers['Location']
- resp.headers['Location'] = location_escaped
- return resp
-
- https_request = http_request
- https_response = http_response
-
-
-def make_socks_conn_class(base_class, socks_proxy):
- assert issubclass(base_class, (
- http.client.HTTPConnection, http.client.HTTPSConnection))
-
- url_components = urllib.parse.urlparse(socks_proxy)
- if url_components.scheme.lower() == 'socks5':
- socks_type = ProxyType.SOCKS5
- elif url_components.scheme.lower() in ('socks', 'socks4'):
- socks_type = ProxyType.SOCKS4
- elif url_components.scheme.lower() == 'socks4a':
- socks_type = ProxyType.SOCKS4A
-
- def unquote_if_non_empty(s):
- if not s:
- return s
- return urllib.parse.unquote_plus(s)
-
- proxy_args = (
- socks_type,
- url_components.hostname, url_components.port or 1080,
- True, # Remote DNS
- unquote_if_non_empty(url_components.username),
- unquote_if_non_empty(url_components.password),
- )
-
- class SocksConnection(base_class):
- def connect(self):
- self.sock = sockssocket()
- self.sock.setproxy(*proxy_args)
- if isinstance(self.timeout, (int, float)):
- self.sock.settimeout(self.timeout)
- self.sock.connect((self.host, self.port))
-
- if isinstance(self, http.client.HTTPSConnection):
- if hasattr(self, '_context'): # Python > 2.6
- self.sock = self._context.wrap_socket(
- self.sock, server_hostname=self.host)
- else:
- self.sock = ssl.wrap_socket(self.sock)
-
- return SocksConnection
-
-
-class YoutubeDLHTTPSHandler(urllib.request.HTTPSHandler):
- def __init__(self, params, https_conn_class=None, *args, **kwargs):
- urllib.request.HTTPSHandler.__init__(self, *args, **kwargs)
- self._https_conn_class = https_conn_class or http.client.HTTPSConnection
- self._params = params
-
- def https_open(self, req):
- kwargs = {}
- conn_class = self._https_conn_class
-
- if hasattr(self, '_context'): # python > 2.6
- kwargs['context'] = self._context
- if hasattr(self, '_check_hostname'): # python 3.x
- kwargs['check_hostname'] = self._check_hostname
-
- socks_proxy = req.headers.get('Ytdl-socks-proxy')
- if socks_proxy:
- conn_class = make_socks_conn_class(conn_class, socks_proxy)
- del req.headers['Ytdl-socks-proxy']
-
- try:
- return self.do_open(
- functools.partial(_create_http_connection, self, conn_class, True), req, **kwargs)
- except urllib.error.URLError as e:
- if (isinstance(e.reason, ssl.SSLError)
- and getattr(e.reason, 'reason', None) == 'SSLV3_ALERT_HANDSHAKE_FAILURE'):
- raise YoutubeDLError('SSLV3_ALERT_HANDSHAKE_FAILURE: Try using --legacy-server-connect')
- raise
-
-
def is_path_like(f):
return isinstance(f, (str, bytes, os.PathLike))
-class YoutubeDLCookieJar(http.cookiejar.MozillaCookieJar):
- """
- See [1] for cookie file format.
-
- 1. https://curl.haxx.se/docs/http-cookies.html
- """
- _HTTPONLY_PREFIX = '#HttpOnly_'
- _ENTRY_LEN = 7
- _HEADER = '''# Netscape HTTP Cookie File
-# This file is generated by yt-dlp. Do not edit.
-
-'''
- _CookieFileEntry = collections.namedtuple(
- 'CookieFileEntry',
- ('domain_name', 'include_subdomains', 'path', 'https_only', 'expires_at', 'name', 'value'))
-
- def __init__(self, filename=None, *args, **kwargs):
- super().__init__(None, *args, **kwargs)
- if is_path_like(filename):
- filename = os.fspath(filename)
- self.filename = filename
-
- @staticmethod
- def _true_or_false(cndn):
- return 'TRUE' if cndn else 'FALSE'
-
- @contextlib.contextmanager
- def open(self, file, *, write=False):
- if is_path_like(file):
- with open(file, 'w' if write else 'r', encoding='utf-8') as f:
- yield f
- else:
- if write:
- file.truncate(0)
- yield file
-
- def _really_save(self, f, ignore_discard=False, ignore_expires=False):
- now = time.time()
- for cookie in self:
- if (not ignore_discard and cookie.discard
- or not ignore_expires and cookie.is_expired(now)):
- continue
- name, value = cookie.name, cookie.value
- if value is None:
- # cookies.txt regards 'Set-Cookie: foo' as a cookie
- # with no name, whereas http.cookiejar regards it as a
- # cookie with no value.
- name, value = '', name
- f.write('%s\n' % '\t'.join((
- cookie.domain,
- self._true_or_false(cookie.domain.startswith('.')),
- cookie.path,
- self._true_or_false(cookie.secure),
- str_or_none(cookie.expires, default=''),
- name, value
- )))
-
- def save(self, filename=None, *args, **kwargs):
- """
- Save cookies to a file.
- Code is taken from CPython 3.6
- https://github.com/python/cpython/blob/8d999cbf4adea053be6dbb612b9844635c4dfb8e/Lib/http/cookiejar.py#L2091-L2117 """
-
- if filename is None:
- if self.filename is not None:
- filename = self.filename
- else:
- raise ValueError(http.cookiejar.MISSING_FILENAME_TEXT)
-
- # Store session cookies with `expires` set to 0 instead of an empty string
- for cookie in self:
- if cookie.expires is None:
- cookie.expires = 0
-
- with self.open(filename, write=True) as f:
- f.write(self._HEADER)
- self._really_save(f, *args, **kwargs)
-
- def load(self, filename=None, ignore_discard=False, ignore_expires=False):
- """Load cookies from a file."""
- if filename is None:
- if self.filename is not None:
- filename = self.filename
- else:
- raise ValueError(http.cookiejar.MISSING_FILENAME_TEXT)
-
- def prepare_line(line):
- if line.startswith(self._HTTPONLY_PREFIX):
- line = line[len(self._HTTPONLY_PREFIX):]
- # comments and empty lines are fine
- if line.startswith('#') or not line.strip():
- return line
- cookie_list = line.split('\t')
- if len(cookie_list) != self._ENTRY_LEN:
- raise http.cookiejar.LoadError('invalid length %d' % len(cookie_list))
- cookie = self._CookieFileEntry(*cookie_list)
- if cookie.expires_at and not cookie.expires_at.isdigit():
- raise http.cookiejar.LoadError('invalid expires at %s' % cookie.expires_at)
- return line
-
- cf = io.StringIO()
- with self.open(filename) as f:
- for line in f:
- try:
- cf.write(prepare_line(line))
- except http.cookiejar.LoadError as e:
- if f'{line.strip()} '[0] in '[{"':
- raise http.cookiejar.LoadError(
- 'Cookies file must be Netscape formatted, not JSON. See '
- 'https://github.com/yt-dlp/yt-dlp/wiki/FAQ#how-do-i-pass-cookies-to-yt-dlp')
- write_string(f'WARNING: skipping cookie file entry due to {e}: {line!r}\n')
- continue
- cf.seek(0)
- self._really_load(cf, filename, ignore_discard, ignore_expires)
- # Session cookies are denoted by either `expires` field set to
- # an empty string or 0. MozillaCookieJar only recognizes the former
- # (see [1]). So we need force the latter to be recognized as session
- # cookies on our own.
- # Session cookies may be important for cookies-based authentication,
- # e.g. usually, when user does not check 'Remember me' check box while
- # logging in on a site, some important cookies are stored as session
- # cookies so that not recognizing them will result in failed login.
- # 1. https://bugs.python.org/issue17164
- for cookie in self:
- # Treat `expires=0` cookies as session cookies
- if cookie.expires == 0:
- cookie.expires = None
- cookie.discard = True
-
-
-class YoutubeDLCookieProcessor(urllib.request.HTTPCookieProcessor):
- def __init__(self, cookiejar=None):
- urllib.request.HTTPCookieProcessor.__init__(self, cookiejar)
-
- def http_response(self, request, response):
- return urllib.request.HTTPCookieProcessor.http_response(self, request, response)
-
- https_request = urllib.request.HTTPCookieProcessor.http_request
- https_response = http_response
-
-
-class YoutubeDLRedirectHandler(urllib.request.HTTPRedirectHandler):
- """YoutubeDL redirect handler
-
- The code is based on HTTPRedirectHandler implementation from CPython [1].
-
- This redirect handler fixes and improves the logic to better align with RFC7261
- and what browsers tend to do [2][3]
-
- 1. https://github.com/python/cpython/blob/master/Lib/urllib/request.py
- 2. https://datatracker.ietf.org/doc/html/rfc7231
- 3. https://github.com/python/cpython/issues/91306
- """
-
- http_error_301 = http_error_303 = http_error_307 = http_error_308 = urllib.request.HTTPRedirectHandler.http_error_302
-
- def redirect_request(self, req, fp, code, msg, headers, newurl):
- if code not in (301, 302, 303, 307, 308):
- raise urllib.error.HTTPError(req.full_url, code, msg, headers, fp)
-
- new_method = req.get_method()
- new_data = req.data
- remove_headers = []
- # A 303 must either use GET or HEAD for subsequent request
- # https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.4
- if code == 303 and req.get_method() != 'HEAD':
- new_method = 'GET'
- # 301 and 302 redirects are commonly turned into a GET from a POST
- # for subsequent requests by browsers, so we'll do the same.
- # https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.2
- # https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.3
- elif code in (301, 302) and req.get_method() == 'POST':
- new_method = 'GET'
-
- # only remove payload if method changed (e.g. POST to GET)
- if new_method != req.get_method():
- new_data = None
- remove_headers.extend(['Content-Length', 'Content-Type'])
-
- new_headers = {k: v for k, v in req.headers.items() if k.lower() not in remove_headers}
-
- return urllib.request.Request(
- newurl, headers=new_headers, origin_req_host=req.origin_req_host,
- unverifiable=True, method=new_method, data=new_data)
-
-
-def extract_timezone(date_str):
+def extract_timezone(date_str, default=None):
m = re.search(
r'''(?x)
^.{8,}? # >=8 char non-TZ prefix, if present
(?P<hours>[0-9]{2}):?(?P<minutes>[0-9]{2}) # hh[:]mm
$)
''', date_str)
+ timezone = None
+
if not m:
m = re.search(r'\d{1,2}:\d{1,2}(?:\.\d+)?(?P<tz>\s*[A-Z]+)$', date_str)
timezone = TIMEZONE_NAMES.get(m and m.group('tz').strip())
if timezone is not None:
date_str = date_str[:-len(m.group('tz'))]
- timezone = datetime.timedelta(hours=timezone or 0)
+ timezone = dt.timedelta(hours=timezone)
else:
date_str = date_str[:-len(m.group('tz'))]
- if not m.group('sign'):
- timezone = datetime.timedelta()
- else:
+ if m.group('sign'):
sign = 1 if m.group('sign') == '+' else -1
- timezone = datetime.timedelta(
+ timezone = dt.timedelta(
hours=sign * int(m.group('hours')),
minutes=sign * int(m.group('minutes')))
+
+ if timezone is None and default is not NO_DEFAULT:
+ timezone = default or dt.timedelta()
+
return timezone, date_str
date_str = re.sub(r'\.[0-9]+', '', date_str)
- if timezone is None:
- timezone, date_str = extract_timezone(date_str)
+ timezone, date_str = extract_timezone(date_str, timezone)
- with contextlib.suppress(ValueError):
+ with contextlib.suppress(ValueError, TypeError):
date_format = f'%Y-%m-%d{delimiter}%H:%M:%S'
- dt = datetime.datetime.strptime(date_str, date_format) - timezone
- return calendar.timegm(dt.timetuple())
+ dt_ = dt.datetime.strptime(date_str, date_format) - timezone
+ return calendar.timegm(dt_.timetuple())
def date_formats(day_first=True):
for expression in date_formats(day_first):
with contextlib.suppress(ValueError):
- upload_date = datetime.datetime.strptime(date_str, expression).strftime('%Y%m%d')
+ upload_date = dt.datetime.strptime(date_str, expression).strftime('%Y%m%d')
if upload_date is None:
timetuple = email.utils.parsedate_tz(date_str)
if timetuple:
with contextlib.suppress(ValueError):
- upload_date = datetime.datetime(*timetuple[:6]).strftime('%Y%m%d')
+ upload_date = dt.datetime(*timetuple[:6]).strftime('%Y%m%d')
if upload_date is not None:
return str(upload_date)
def unified_timestamp(date_str, day_first=True):
- if date_str is None:
+ if not isinstance(date_str, str):
return None
date_str = re.sub(r'\s+', ' ', re.sub(
for expression in date_formats(day_first):
with contextlib.suppress(ValueError):
- dt = datetime.datetime.strptime(date_str, expression) - timezone + datetime.timedelta(hours=pm_delta)
- return calendar.timegm(dt.timetuple())
+ dt_ = dt.datetime.strptime(date_str, expression) - timezone + dt.timedelta(hours=pm_delta)
+ return calendar.timegm(dt_.timetuple())
timetuple = email.utils.parsedate_tz(date_str)
if timetuple:
if precision == 'auto':
auto_precision = True
precision = 'microsecond'
- today = datetime_round(datetime.datetime.utcnow(), precision)
+ today = datetime_round(dt.datetime.now(dt.timezone.utc), precision)
if date_str in ('now', 'today'):
return today
if date_str == 'yesterday':
- return today - datetime.timedelta(days=1)
+ return today - dt.timedelta(days=1)
match = re.match(
r'(?P<start>.+)(?P<sign>[+-])(?P<time>\d+)(?P<unit>microsecond|second|minute|hour|day|week|month|year)s?',
date_str)
if unit == 'week':
unit = 'day'
time *= 7
- delta = datetime.timedelta(**{unit + 's': time})
+ delta = dt.timedelta(**{unit + 's': time})
new_date = start_time + delta
if auto_precision:
return datetime_round(new_date, unit)
return new_date
- return datetime_round(datetime.datetime.strptime(date_str, format), precision)
+ return datetime_round(dt.datetime.strptime(date_str, format), precision)
def date_from_str(date_str, format='%Y%m%d', strict=False):
return datetime_from_str(date_str, precision='microsecond', format=format).date()
-def datetime_add_months(dt, months):
+def datetime_add_months(dt_, months):
"""Increment/Decrement a datetime object by months."""
- month = dt.month + months - 1
- year = dt.year + month // 12
+ month = dt_.month + months - 1
+ year = dt_.year + month // 12
month = month % 12 + 1
- day = min(dt.day, calendar.monthrange(year, month)[1])
- return dt.replace(year, month, day)
+ day = min(dt_.day, calendar.monthrange(year, month)[1])
+ return dt_.replace(year, month, day)
-def datetime_round(dt, precision='day'):
+def datetime_round(dt_, precision='day'):
"""
Round a datetime object's time to a specific precision
"""
if precision == 'microsecond':
- return dt
+ return dt_
unit_seconds = {
'day': 86400,
'second': 1,
}
roundto = lambda x, n: ((x + n / 2) // n) * n
- timestamp = calendar.timegm(dt.timetuple())
- return datetime.datetime.utcfromtimestamp(roundto(timestamp, unit_seconds[precision]))
+ timestamp = roundto(calendar.timegm(dt_.timetuple()), unit_seconds[precision])
+ return dt.datetime.fromtimestamp(timestamp, dt.timezone.utc)
def hyphenate_date(date_str):
if start is not None:
self.start = date_from_str(start, strict=True)
else:
- self.start = datetime.datetime.min.date()
+ self.start = dt.datetime.min.date()
if end is not None:
self.end = date_from_str(end, strict=True)
else:
- self.end = datetime.datetime.max.date()
+ self.end = dt.datetime.max.date()
if self.start > self.end:
- raise ValueError('Date range: "%s" , the start date must be before the end date' % self)
+ raise ValueError(f'Date range: "{self}" , the start date must be before the end date')
@classmethod
def day(cls, day):
def __contains__(self, date):
"""Check if the date is in the range"""
- if not isinstance(date, datetime.date):
+ if not isinstance(date, dt.date):
date = date_from_str(date)
return self.start <= date <= self.end
def __repr__(self):
return f'{__name__}.{type(self).__name__}({self.start.isoformat()!r}, {self.end.isoformat()!r})'
+ def __str__(self):
+ return f'{self.start} to {self.end}'
+
def __eq__(self, other):
return (isinstance(other, DateRange)
and self.start == other.start and self.end == other.end)
with contextlib.suppress(OSError): # We may not have access to the executable
libc_ver = platform.libc_ver()
- return 'Python %s (%s %s %s) - %s (%s%s)' % (
+ return 'Python {} ({} {} {}) - {} ({}{})'.format(
platform.python_version(),
python_implementation,
platform.machine(),
@functools.cache
def get_windows_version():
- ''' Get Windows version. returns () if it's not running on Windows '''
+ """ Get Windows version. returns () if it's not running on Windows """
if compat_os_name == 'nt':
return version_tuple(platform.win32_ver()[1])
else:
s = re.sub(r'([\r\n]+)', r' \1', s)
enc, buffer = None, out
- if 'b' in getattr(out, 'mode', ''):
+ # `mode` might be `None` (Ref: https://github.com/yt-dlp/yt-dlp/issues/8816)
+ if 'b' in (getattr(out, 'mode', None) or ''):
enc = encoding or preferredencoding()
elif hasattr(out, 'buffer'):
buffer = out.buffer
out.flush()
+# TODO: Use global logger
def deprecation_warning(msg, *, printer=None, stacklevel=0, **kwargs):
from .. import _IN_CLI
if _IN_CLI:
ctypes.wintypes.DWORD, # dwReserved
ctypes.wintypes.DWORD, # nNumberOfBytesToLockLow
ctypes.wintypes.DWORD, # nNumberOfBytesToLockHigh
- ctypes.POINTER(OVERLAPPED) # Overlapped
+ ctypes.POINTER(OVERLAPPED), # Overlapped
]
LockFileEx.restype = ctypes.wintypes.BOOL
UnlockFileEx = kernel32.UnlockFileEx
ctypes.wintypes.DWORD, # dwReserved
ctypes.wintypes.DWORD, # nNumberOfBytesToLockLow
ctypes.wintypes.DWORD, # nNumberOfBytesToLockHigh
- ctypes.POINTER(OVERLAPPED) # Overlapped
+ ctypes.POINTER(OVERLAPPED), # Overlapped
]
UnlockFileEx.restype = ctypes.wintypes.BOOL
whole_low = 0xffffffff
assert f._lock_file_overlapped_p
handle = msvcrt.get_osfhandle(f.fileno())
if not UnlockFileEx(handle, 0, whole_low, whole_high, f._lock_file_overlapped_p):
- raise OSError('Unlocking file failed: %r' % ctypes.FormatError())
+ raise OSError(f'Unlocking file failed: {ctypes.FormatError()!r}')
else:
try:
except ImportError:
def _lock_file(f, exclusive, block):
- raise LockingUnsupportedError()
+ raise LockingUnsupportedError
def _unlock_file(f):
- raise LockingUnsupportedError()
+ raise LockingUnsupportedError
class locked_file:
return encoding if encoding is not None else 'utf-8'
-def shell_quote(args):
- quoted_args = []
- encoding = get_filesystem_encoding()
- for a in args:
- if isinstance(a, bytes):
- # We may get a filename encoded with 'encodeFilename'
- a = a.decode(encoding)
- quoted_args.append(compat_shlex_quote(a))
- return ' '.join(quoted_args)
+_WINDOWS_QUOTE_TRANS = str.maketrans({'"': R'\"'})
+_CMD_QUOTE_TRANS = str.maketrans({
+ # Keep quotes balanced by replacing them with `""` instead of `\\"`
+ '"': '""',
+ # These require an env-variable `=` containing `"^\n\n"` (set in `utils.Popen`)
+ # `=` should be unique since variables containing `=` cannot be set using cmd
+ '\n': '%=%',
+ '\r': '%=%',
+ # Use zero length variable replacement so `%` doesn't get expanded
+ # `cd` is always set as long as extensions are enabled (`/E:ON` in `utils.Popen`)
+ '%': '%%cd:~,%',
+})
+
+
+def shell_quote(args, *, shell=False):
+ args = list(variadic(args))
+
+ if compat_os_name != 'nt':
+ return shlex.join(args)
+
+ trans = _CMD_QUOTE_TRANS if shell else _WINDOWS_QUOTE_TRANS
+ return ' '.join(
+ s if re.fullmatch(r'[\w#$*\-+./:?@\\]+', s, re.ASCII)
+ else re.sub(r'(\\+)("|$)', r'\1\1\2', s).translate(trans).join('""')
+ for s in args)
def smuggle_url(url, data):
buf = ctypes.create_string_buffer(len(title_bytes))
buf.value = title_bytes
try:
+ # PR_SET_NAME = 15 Ref: /usr/include/linux/prctl.h
libc.prctl(15, buf, 0, 0, 0)
except AttributeError:
return # Strange libc, just skip this
def remove_quotes(s):
if s is None or len(s) < 2:
return s
- for quote in ('"', "'", ):
+ for quote in ('"', "'"):
if s[0] == quote and s[-1] == quote:
return s[1:-1]
return s
return urllib.parse.urljoin(base, path)
-class HEADRequest(urllib.request.Request):
- def get_method(self):
- return 'HEAD'
-
-
-class PUTRequest(urllib.request.Request):
- def get_method(self):
- return 'PUT'
-
-
def int_or_none(v, scale=1, default=None, get_attr=None, invscale=1):
if get_attr and v is not None:
v = getattr(v, get_attr, None)
return url if re.match(r'^(?:(?:https?|rt(?:m(?:pt?[es]?|fp)|sp[su]?)|mms|ftps?):)?//', url) else None
-def request_to_url(req):
- if isinstance(req, urllib.request.Request):
- return req.get_full_url()
- else:
- return req
-
-
-def strftime_or_none(timestamp, date_format, default=None):
+def strftime_or_none(timestamp, date_format='%Y%m%d', default=None):
datetime_object = None
try:
if isinstance(timestamp, (int, float)): # unix timestamp
# Using naive datetime here can break timestamp() in Windows
# Ref: https://github.com/yt-dlp/yt-dlp/issues/5185, https://github.com/python/cpython/issues/94414
- datetime_object = datetime.datetime.fromtimestamp(timestamp, datetime.timezone.utc)
+ # Also, dt.datetime.fromtimestamp breaks for negative timestamps
+ # Ref: https://github.com/yt-dlp/yt-dlp/issues/6706#issuecomment-1496842642
+ datetime_object = (dt.datetime.fromtimestamp(0, dt.timezone.utc)
+ + dt.timedelta(seconds=timestamp))
elif isinstance(timestamp, str): # assume YYYYMMDD
- datetime_object = datetime.datetime.strptime(timestamp, '%Y%m%d')
+ datetime_object = dt.datetime.strptime(timestamp, '%Y%m%d')
date_format = re.sub( # Support %s on windows
r'(?<!%)(%%)*%s', rf'\g<1>{int(datetime_object.timestamp())}', date_format)
return datetime_object.strftime(date_format)
)?
T)?
(?:
- (?P<hours>[0-9]+)\s*h(?:ours?)?,?\s*
+ (?P<hours>[0-9]+)\s*h(?:(?:ou)?rs?)?,?\s*
)?
(?:
(?P<mins>[0-9]+)\s*m(?:in(?:ute)?s?)?,?\s*
def replace_extension(filename, ext, expected_real_ext=None):
name, real_ext = os.path.splitext(filename)
- return '{}.{}'.format(
- name if not expected_real_ext or real_ext[1:] == expected_real_ext else filename,
- ext)
+ return f'{name if not expected_real_ext or real_ext[1:] == expected_real_ext else filename}.{ext}'
def check_executable(exe, args=[]):
""" Checks if the given binary is installed somewhere in PATH, and returns its name.
args can be a list of arguments for a short output (like -version) """
try:
- Popen.run([exe] + args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ Popen.run([exe, *args], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except OSError:
return False
return exe
# STDIN should be redirected too. On UNIX-like systems, ffmpeg triggers
# SIGTTOU if yt-dlp is run in the background.
# See https://github.com/ytdl-org/youtube-dl/issues/955#issuecomment-209789656
- stdout, _, ret = Popen.run([encodeArgument(exe)] + args, text=True,
+ stdout, _, ret = Popen.run([encodeArgument(exe), *args], text=True,
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if ret:
return None
"""Lazy immutable list from an iterable
Note that slices of a LazyList are lists and not LazyList"""
- class IndexError(IndexError):
+ class IndexError(IndexError): # noqa: A001
pass
def __init__(self, iterable, *, reverse=False, _cache=None):
class PagedList:
- class IndexError(IndexError):
+ class IndexError(IndexError): # noqa: A001
pass
def __len__(self):
raise TypeError('indices must be non-negative integers')
entries = self.getslice(idx, idx + 1)
if not entries:
- raise self.IndexError()
+ raise self.IndexError
return entries[0]
+ def __bool__(self):
+ return bool(self.getslice(0, 1))
+
class OnDemandPagedList(PagedList):
"""Download pages until a page with less than maximum results"""
except IndexError:
entry = self.MissingEntry
if not self.is_incomplete:
- raise self.IndexError()
+ raise self.IndexError
if entry is self.MissingEntry:
raise EntryNotInPlaylist(f'Entry {i + 1} cannot be found')
return entry
try:
return type(self.ydl)._handle_extraction_exceptions(lambda _, i: self._entries[i])(self.ydl, i)
except (LazyList.IndexError, PagedList.IndexError):
- raise self.IndexError()
+ raise self.IndexError
return get_entry
def __getitem__(self, idx):
def __len__(self):
return len(tuple(self[:]))
- class IndexError(IndexError):
+ class IndexError(IndexError): # noqa: A001
pass
s)
-def escape_rfc3986(s):
- """Escape non-ASCII characters as suggested by RFC 3986"""
- return urllib.parse.quote(s, b"%/;:@&=+$,!~*'()?#[]")
-
-
-def escape_url(url):
- """Escape URL as suggested by RFC 3986"""
- url_parsed = urllib.parse.urlparse(url)
- return url_parsed._replace(
- netloc=url_parsed.netloc.encode('idna').decode('ascii'),
- path=escape_rfc3986(url_parsed.path),
- params=escape_rfc3986(url_parsed.params),
- query=escape_rfc3986(url_parsed.query),
- fragment=escape_rfc3986(url_parsed.fragment)
- ).geturl()
-
-
def parse_qs(url, **kwargs):
return urllib.parse.parse_qs(urllib.parse.urlparse(url).query, **kwargs)
return False
# "#" cannot be stripped out since it is part of the URI
# However, it can be safely stripped out if following a whitespace
- return re.split(r'\s#', url, 1)[0].rstrip()
+ return re.split(r'\s#', url, maxsplit=1)[0].rstrip()
with contextlib.closing(batch_fd) as fd:
return [url for url in map(fixup, fd) if url]
assert 'query' not in kwargs, 'query_update and query cannot be specified at the same time'
kwargs['query'] = urllib.parse.urlencode({
**urllib.parse.parse_qs(url.query),
- **query_update
+ **query_update,
}, True)
return urllib.parse.urlunparse(url._replace(**kwargs))
return update_url(url, query_update=query)
-def update_Request(req, url=None, data=None, headers=None, query=None):
- req_headers = req.headers.copy()
- req_headers.update(headers or {})
- req_data = data or req.data
- req_url = update_url_query(url or req.get_full_url(), query)
- req_get_method = req.get_method()
- if req_get_method == 'HEAD':
- req_type = HEADRequest
- elif req_get_method == 'PUT':
- req_type = PUTRequest
- else:
- req_type = urllib.request.Request
- new_req = req_type(
- req_url, data=req_data, headers=req_headers,
- origin_req_host=req.origin_req_host, unverifiable=req.unverifiable)
- if hasattr(req, 'timeout'):
- new_req.timeout = req.timeout
- return new_req
-
-
def _multipart_encode_impl(data, boundary):
- content_type = 'multipart/form-data; boundary=%s' % boundary
+ content_type = f'multipart/form-data; boundary={boundary}'
out = b''
for k, v in data.items():
def multipart_encode(data, boundary=None):
- '''
+ """
Encode a dict to RFC 7578-compliant form-data
data:
a random boundary is generated.
Reference: https://tools.ietf.org/html/rfc7578
- '''
+ """
has_specified_boundary = boundary is not None
while True:
s = s.upper()
if s in US_RATINGS:
return US_RATINGS[s]
- m = re.match(r'^TV[_-]?(%s)$' % '|'.join(k[3:] for k in TV_PARENTAL_GUIDELINES), s)
+ m = re.match(r'^TV[_-]?({})$'.format('|'.join(k[3:] for k in TV_PARENTAL_GUIDELINES)), s)
if m:
return TV_PARENTAL_GUIDELINES['TV-' + m.group(1)]
return None
return v
elif v in ('undefined', 'void 0'):
return 'null'
- elif v.startswith('/*') or v.startswith('//') or v.startswith('!') or v == ',':
+ elif v.startswith(('/*', '//', '!')) or v == ',':
return ''
if v[0] in STRING_QUOTES:
def create_map(mobj):
return json.dumps(dict(json.loads(js_to_json(mobj.group(1) or '[]', vars=vars))))
+ code = re.sub(r'(?:new\s+)?Array\((.*?)\)', r'[\g<1>]', code)
code = re.sub(r'new Map\((\[.*?\])?\)', create_map, code)
if not strict:
- code = re.sub(r'new Date\((".+")\)', r'\g<1>', code)
+ code = re.sub(rf'new Date\(({STRING_RE})\)', r'\g<1>', code)
code = re.sub(r'new \w+\((.*?)\)', lambda m: json.dumps(m.group(0)), code)
code = re.sub(r'parseInt\([^\d]+(\d+)[^\d]+\)', r'\1', code)
code = re.sub(r'\(function\([^)]*\)\s*\{[^}]*\}\s*\)\s*\(\s*(["\'][^)]*["\'])\s*\)', r'\1', code)
'''
-STR_FORMAT_TYPES = 'diouxXeEfFgGcrs'
+STR_FORMAT_TYPES = 'diouxXeEfFgGcrsa'
def limit_length(s, length):
def args_to_str(args):
# Get a short string representation for a subprocess command
- return ' '.join(compat_shlex_quote(a) for a in args)
+ return shell_quote(args)
def error_to_str(err):
'quicktime': 'mov',
'webm': 'webm',
'vp9': 'vp9',
+ 'video/ogg': 'ogv',
'x-flv': 'flv',
'x-m4v': 'm4v',
'x-matroska': 'mkv',
},
}
- sanitize_codec = functools.partial(try_get, getter=lambda x: x[0].split('.')[0].replace('0', ''))
+ sanitize_codec = functools.partial(
+ try_get, getter=lambda x: x[0].split('.')[0].replace('0', '').lower())
vcodec, acodec = sanitize_codec(vcodecs), sanitize_codec(acodecs)
for ext in preferences or COMPATIBLE_CODECS.keys():
def encode_data_uri(data, mime_type):
- return 'data:%s;base64,%s' % (mime_type, base64.b64encode(data).decode('ascii'))
+ return 'data:{};base64,{}'.format(mime_type, base64.b64encode(data).decode('ascii'))
def age_restricted(content_limit, age_limit):
def get_max_lens(table):
return [max(width(str(v)) for v in col) for col in zip(*table)]
- def filter_using_list(row, filterArray):
- return [col for take, col in itertools.zip_longest(filterArray, row, fillvalue=True) if take]
+ def filter_using_list(row, filter_array):
+ return [col for take, col in itertools.zip_longest(filter_array, row, fillvalue=True) if take]
max_lens = get_max_lens(data) if hide_empty else []
header_row = filter_using_list(header_row, max_lens)
data = [filter_using_list(row, max_lens) for row in data]
- table = [header_row] + data
+ table = [header_row, *data]
max_lens = get_max_lens(table)
extra_gap += 1
if delim:
- table = [header_row, [delim * (ml + extra_gap) for ml in max_lens]] + data
+ table = [header_row, [delim * (ml + extra_gap) for ml in max_lens], *data]
table[1][-1] = table[1][-1][:-extra_gap * len(delim)] # Remove extra_gap from end of delimiter
for row in table:
for pos, text in enumerate(map(str, row)):
row[pos] = text.replace('\t', ' ' * (max_lens[pos] - width(text))) + ' ' * extra_gap
else:
row[pos] = text + ' ' * (max_lens[pos] - width(text) + extra_gap)
- ret = '\n'.join(''.join(row).rstrip() for row in table)
- return ret
+ return '\n'.join(''.join(row).rstrip() for row in table)
def _match_one(filter_part, dct, incomplete):
operator_rex = re.compile(r'''(?x)
(?P<key>[a-z_]+)
- \s*(?P<negation>!\s*)?(?P<op>%s)(?P<none_inclusive>\s*\?)?\s*
+ \s*(?P<negation>!\s*)?(?P<op>{})(?P<none_inclusive>\s*\?)?\s*
(?:
(?P<quote>["\'])(?P<quotedstrval>.+?)(?P=quote)|
(?P<strval>.+?)
)
- ''' % '|'.join(map(re.escape, COMPARISON_OPERATORS.keys())))
+ '''.format('|'.join(map(re.escape, COMPARISON_OPERATORS.keys()))))
m = operator_rex.fullmatch(filter_part.strip())
if m:
m = m.groupdict()
op = unnegated_op
comparison_value = m['quotedstrval'] or m['strval'] or m['intval']
if m['quote']:
- comparison_value = comparison_value.replace(r'\%s' % m['quote'], m['quote'])
+ comparison_value = comparison_value.replace(r'\{}'.format(m['quote']), m['quote'])
actual_value = dct.get(m['key'])
numeric_comparison = None
if isinstance(actual_value, (int, float)):
if numeric_comparison is None:
numeric_comparison = parse_duration(comparison_value)
if numeric_comparison is not None and m['op'] in STRING_OPERATORS:
- raise ValueError('Operator %s only supports string values!' % m['op'])
+ raise ValueError('Operator {} only supports string values!'.format(m['op']))
if actual_value is None:
return is_incomplete(m['key']) or m['none_inclusive']
return op(actual_value, comparison_value if numeric_comparison is None else numeric_comparison)
'!': lambda v: (v is False) if isinstance(v, bool) else (v is None),
}
operator_rex = re.compile(r'''(?x)
- (?P<op>%s)\s*(?P<key>[a-z_]+)
- ''' % '|'.join(map(re.escape, UNARY_OPERATORS.keys())))
+ (?P<op>{})\s*(?P<key>[a-z_]+)
+ '''.format('|'.join(map(re.escape, UNARY_OPERATORS.keys()))))
m = operator_rex.fullmatch(filter_part.strip())
if m:
op = UNARY_OPERATORS[m.group('op')]
return True
return op(actual_value)
- raise ValueError('Invalid filter part %r' % filter_part)
+ raise ValueError(f'Invalid filter part {filter_part!r}')
def match_str(filter_str, dct, incomplete=False):
def match_filter_func(filters, breaking_filters=None):
if not filters and not breaking_filters:
return None
+ repr_ = f'{match_filter_func.__module__}.{match_filter_func.__qualname__}({filters}, {breaking_filters})'
+
breaking_filters = match_filter_func(breaking_filters) or (lambda _, __: None)
filters = set(variadic(filters or []))
if interactive:
filters.remove('-')
+ @function_with_repr.set_repr(repr_)
def _match_func(info_dict, incomplete=False):
ret = breaking_filters(info_dict, incomplete)
if ret is not None:
class download_range_func:
- def __init__(self, chapters, ranges):
- self.chapters, self.ranges = chapters, ranges
+ def __init__(self, chapters, ranges, from_info=False):
+ self.chapters, self.ranges, self.from_info = chapters, ranges, from_info
def __call__(self, info_dict, ydl):
- if not self.ranges and not self.chapters:
- yield {}
warning = ('There are no chapters matching the regex' if info_dict.get('chapters')
else 'Cannot match chapters since chapter information is unavailable')
if self.chapters and warning:
ydl.to_screen(f'[info] {info_dict["id"]}: {warning}')
- yield from ({'start_time': start, 'end_time': end} for start, end in self.ranges or [])
+ for start, end in self.ranges or []:
+ yield {
+ 'start_time': self._handle_negative_timestamp(start, info_dict),
+ 'end_time': self._handle_negative_timestamp(end, info_dict),
+ }
+
+ if self.from_info and (info_dict.get('start_time') or info_dict.get('end_time')):
+ yield {
+ 'start_time': info_dict.get('start_time') or 0,
+ 'end_time': info_dict.get('end_time') or float('inf'),
+ }
+ elif not self.ranges and not self.chapters:
+ yield {}
+
+ @staticmethod
+ def _handle_negative_timestamp(time, info):
+ return max(info['duration'] + time, 0) if info.get('duration') and time < 0 else time
def __eq__(self, other):
return (isinstance(other, download_range_func)
def dfxp2srt(dfxp_data):
- '''
+ """
@param dfxp_data A bytes-like object containing DFXP data
@returns A unicode object containing converted SRT data
- '''
+ """
LEGACY_NAMESPACES = (
(b'http://www.w3.org/ns/ttml', [
b'http://www.w3.org/2004/11/ttaf1',
'fontSize',
'fontStyle',
'fontWeight',
- 'textDecoration'
+ 'textDecoration',
]
_x = functools.partial(xpath_with_ns, ns_map={
if self._applied_styles and self._applied_styles[-1].get(k) == v:
continue
if k == 'color':
- font += ' color="%s"' % v
+ font += f' color="{v}"'
elif k == 'fontSize':
- font += ' size="%s"' % v
+ font += f' size="{v}"'
elif k == 'fontFamily':
- font += ' face="%s"' % v
+ font += f' face="{v}"'
elif k == 'fontWeight' and v == 'bold':
self._out += '<b>'
unclosed_elements.append('b')
if tag not in (_x('ttml:br'), 'br'):
unclosed_elements = self._unclosed_elements.pop()
for element in reversed(unclosed_elements):
- self._out += '</%s>' % element
+ self._out += f'</{element}>'
if unclosed_elements and self._applied_styles:
self._applied_styles.pop()
'or': 'ori',
'os': 'oss',
'pa': 'pan',
+ 'pe': 'per',
'pi': 'pli',
'pl': 'pol',
'ps': 'pus',
struct.pack('!L', random.randint(addr_min, addr_max))))
-class PerRequestProxyHandler(urllib.request.ProxyHandler):
- def __init__(self, proxies=None):
- # Set default handlers
- for type in ('http', 'https'):
- setattr(self, '%s_open' % type,
- lambda r, proxy='__noproxy__', type=type, meth=self.proxy_open:
- meth(r, proxy, type))
- urllib.request.ProxyHandler.__init__(self, proxies)
-
- def proxy_open(self, req, proxy, type):
- req_proxy = req.headers.get('Ytdl-request-proxy')
- if req_proxy is not None:
- proxy = req_proxy
- del req.headers['Ytdl-request-proxy']
-
- if proxy == '__noproxy__':
- return None # No Proxy
- if urllib.parse.urlparse(proxy).scheme.lower() in ('socks', 'socks4', 'socks4a', 'socks5'):
- req.add_header('Ytdl-socks-proxy', proxy)
- # yt-dlp's http/https handlers do wrapping the socket with socks
- return None
- return urllib.request.ProxyHandler.proxy_open(
- self, req, proxy, type)
-
-
# Both long_to_bytes and bytes_to_long are adapted from PyCrypto, which is
# released into Public Domain
# https://github.com/dlitz/pycrypto/blob/master/lib/Crypto/Util/number.py#L387
def ohdave_rsa_encrypt(data, exponent, modulus):
- '''
+ """
Implement OHDave's RSA algorithm. See http://www.ohdave.com/rsa/
Input:
Output: hex string of encrypted data
Limitation: supports one block encryption only
- '''
+ """
payload = int(binascii.hexlify(data[::-1]), 16)
encrypted = pow(payload, exponent, modulus)
- return '%x' % encrypted
+ return f'{encrypted:x}'
def pkcs1pad(data, length):
raise ValueError('Input data too long for PKCS#1 padding')
pseudo_random = [random.randint(0, 254) for _ in range(length - len(data) - 3)]
- return [0, 2] + pseudo_random + [0] + data
+ return [0, 2, *pseudo_random, 0, *data]
def _base_n_table(n, table):
raise XAttrMetadataError(e.errno, e.strerror)
return
- # UNIX Method 1. Use xattrs/pyxattrs modules
+ # UNIX Method 1. Use os.setxattr/xattrs/pyxattrs modules
setxattr = None
- if getattr(xattr, '_yt_dlp__identifier', None) == 'pyxattr':
+ if callable(getattr(os, 'setxattr', None)):
+ setxattr = os.setxattr
+ elif getattr(xattr, '_yt_dlp__identifier', None) == 'pyxattr':
# Unicode arguments are not supported in pyxattr until version 0.5.0
# See https://github.com/ytdl-org/youtube-dl/issues/5498
if version_tuple(xattr.__version__) >= (0, 5, 0):
else 'xattr' if check_executable('xattr', ['-h']) else None)
if not exe:
raise XAttrUnavailableError(
- 'Couldn\'t find a tool to set the xattrs. Install either the python "xattr" or "pyxattr" modules or the '
+ 'Couldn\'t find a tool to set the xattrs. Install either the "xattr" or "pyxattr" Python modules or the '
+ ('"xattr" binary' if sys.platform != 'linux' else 'GNU "attr" package (which contains the "setfattr" tool)'))
value = value.decode()
def random_birthday(year_field, month_field, day_field):
- start_date = datetime.date(1950, 1, 1)
- end_date = datetime.date(1995, 12, 31)
+ start_date = dt.date(1950, 1, 1)
+ end_date = dt.date(1995, 12, 31)
offset = random.randint(0, (end_date - start_date).days)
- random_date = start_date + datetime.timedelta(offset)
+ random_date = start_date + dt.timedelta(offset)
return {
year_field: str(random_date.year),
month_field: str(random_date.month),
def clean_podcast_url(url):
- return re.sub(r'''(?x)
+ url = re.sub(r'''(?x)
(?:
(?:
chtbl\.com/track|
media\.blubrry\.com| # https://create.blubrry.com/resources/podcast-media-download-statistics/getting-started/
- play\.podtrac\.com
- )/[^/]+|
+ play\.podtrac\.com|
+ chrt\.fm/track|
+ mgln\.ai/e
+ )(?:/[^/.]+)?|
(?:dts|www)\.podtrac\.com/(?:pts/)?redirect\.[0-9a-z]{3,4}| # http://analytics.podtrac.com/how-to-measure
flex\.acast\.com|
pd(?:
cn\.co| # https://podcorn.com/analytics-prefix/
st\.fm # https://podsights.com/docs/
- )/e
+ )/e|
+ [0-9]\.gum\.fm|
+ pscrb\.fm/rss/p
)/''', '', url)
+ return re.sub(r'^\w+://(\w+://)', r'\1', url)
_HEX_TABLE = '0123456789abcdef'
"""
Returns TZ-aware time in seconds since the epoch (1970-01-01T00:00:00Z)
"""
- return time.time() + datetime.timedelta(**kwargs).total_seconds()
+ return time.time() + dt.timedelta(**kwargs).total_seconds()
# create a JSON Web Signature (jws) with HS256 algorithm
payload_b64 = base64.b64encode(json.dumps(payload_data).encode())
h = hmac.new(key.encode(), header_b64 + b'.' + payload_b64, hashlib.sha256)
signature_b64 = base64.b64encode(h.digest())
- token = header_b64 + b'.' + payload_b64 + b'.' + signature_b64
- return token
+ return header_b64 + b'.' + payload_b64 + b'.' + signature_b64
# can be extended in future to verify the signature and parse header and return the algorithm used if it's not HS256
def jwt_decode_hs256(jwt):
header_b64, payload_b64, signature_b64 = jwt.split('.')
# add trailing ='s that may have been stripped, superfluous ='s are ignored
- payload_data = json.loads(base64.urlsafe_b64decode(f'{payload_b64}==='))
- return payload_data
+ return json.loads(base64.urlsafe_b64decode(f'{payload_b64}==='))
WINDOWS_VT_MODE = False if compat_os_name == 'nt' else None
"""
_keys = ('width', 'height')
max_dimensions = max(
- (tuple(format.get(k) or 0 for k in _keys) for format in formats),
+ (tuple(fmt.get(k) or 0 for k in _keys) for fmt in formats),
default=(0, 0))
if not max_dimensions[0]:
return thumbnails
def read_stdin(what):
- eof = 'Ctrl+Z' if compat_os_name == 'nt' else 'Ctrl+D'
- write_string(f'Reading {what} from STDIN - EOF ({eof}) to end:\n')
+ if what:
+ eof = 'Ctrl+Z' if compat_os_name == 'nt' else 'Ctrl+D'
+ write_string(f'Reading {what} from STDIN - EOF ({eof}) to end:\n')
return sys.stdin
return self.parser.parse_args(self.all_args)
-class WebSocketsWrapper:
- """Wraps websockets module to use in non-async scopes"""
- pool = None
-
- def __init__(self, url, headers=None, connect=True):
- self.loop = asyncio.new_event_loop()
- # XXX: "loop" is deprecated
- self.conn = websockets.connect(
- url, extra_headers=headers, ping_interval=None,
- close_timeout=float('inf'), loop=self.loop, ping_timeout=float('inf'))
- if connect:
- self.__enter__()
- atexit.register(self.__exit__, None, None, None)
-
- def __enter__(self):
- if not self.pool:
- self.pool = self.run_with_loop(self.conn.__aenter__(), self.loop)
- return self
-
- def send(self, *args):
- self.run_with_loop(self.pool.send(*args), self.loop)
-
- def recv(self, *args):
- return self.run_with_loop(self.pool.recv(*args), self.loop)
-
- def __exit__(self, type, value, traceback):
- try:
- return self.run_with_loop(self.conn.__aexit__(type, value, traceback), self.loop)
- finally:
- self.loop.close()
- self._cancel_all_tasks(self.loop)
-
- # taken from https://github.com/python/cpython/blob/3.9/Lib/asyncio/runners.py with modifications
- # for contributors: If there's any new library using asyncio needs to be run in non-async, move these function out of this class
- @staticmethod
- def run_with_loop(main, loop):
- if not asyncio.iscoroutine(main):
- raise ValueError(f'a coroutine was expected, got {main!r}')
-
- try:
- return loop.run_until_complete(main)
- finally:
- loop.run_until_complete(loop.shutdown_asyncgens())
- if hasattr(loop, 'shutdown_default_executor'):
- loop.run_until_complete(loop.shutdown_default_executor())
-
- @staticmethod
- def _cancel_all_tasks(loop):
- to_cancel = asyncio.all_tasks(loop)
-
- if not to_cancel:
- return
-
- for task in to_cancel:
- task.cancel()
-
- # XXX: "loop" is removed in python 3.10+
- loop.run_until_complete(
- asyncio.gather(*to_cancel, loop=loop, return_exceptions=True))
-
- for task in to_cancel:
- if task.cancelled():
- continue
- if task.exception() is not None:
- loop.call_exception_handler({
- 'message': 'unhandled exception during asyncio.run() shutdown',
- 'exception': task.exception(),
- 'task': task,
- })
-
-
def merge_headers(*dicts):
"""Merge dicts of http headers case insensitively, prioritizing the latter ones"""
return {k.title(): v for k, v in itertools.chain.from_iterable(map(dict.items, dicts))}
def __call__(self, *args, **kwargs):
return self.func(*args, **kwargs)
+ @classmethod
+ def set_repr(cls, repr_):
+ return functools.partial(cls, repr_=repr_)
+
def __repr__(self):
if self.__repr:
return self.__repr
assert left > 3 and right >= 0
if s is None or len(s) <= left + right:
return s
- return f'{s[:left-3]}...{s[-right:] if right else ""}'
+ return f'{s[:left - 3]}...{s[-right:] if right else ""}'
def orderedSet_from_options(options, alias_dict, *, use_regex=False, start=None):
return orderedSet(requested)
+# TODO: Rewrite
class FormatSorter:
regex = r' *((?P<reverse>\+)?(?P<field>[a-zA-Z0-9_]+)((?P<separator>[~:])(?P<limit>.*?))?)? *$'
'source': {'convert': 'float', 'field': 'source_preference', 'default': -1},
'codec': {'type': 'combined', 'field': ('vcodec', 'acodec')},
- 'br': {'type': 'combined', 'field': ('tbr', 'vbr', 'abr'), 'same_limit': True},
- 'size': {'type': 'combined', 'same_limit': True, 'field': ('filesize', 'fs_approx')},
+ 'br': {'type': 'multiple', 'field': ('tbr', 'vbr', 'abr'), 'convert': 'float_none',
+ 'function': lambda it: next(filter(None, it), None)},
+ 'size': {'type': 'multiple', 'field': ('filesize', 'fs_approx'), 'convert': 'bytes',
+ 'function': lambda it: next(filter(None, it), None)},
'ext': {'type': 'combined', 'field': ('vext', 'aext')},
'res': {'type': 'multiple', 'field': ('height', 'width'),
- 'function': lambda it: (lambda l: min(l) if l else 0)(tuple(filter(None, it)))},
+ 'function': lambda it: min(filter(None, it), default=0)},
# Actual field names
'format_id': {'type': 'alias', 'field': 'id'},
self.ydl.deprecated_feature(f'Using arbitrary fields ({field}) for format sorting is '
'deprecated and may be removed in a future version')
self.settings[field] = {}
- propObj = self.settings[field]
- if key not in propObj:
- type = propObj.get('type')
+ prop_obj = self.settings[field]
+ if key not in prop_obj:
+ type_ = prop_obj.get('type')
if key == 'field':
- default = 'preference' if type == 'extractor' else (field,) if type in ('combined', 'multiple') else field
+ default = 'preference' if type_ == 'extractor' else (field,) if type_ in ('combined', 'multiple') else field
elif key == 'convert':
- default = 'order' if type == 'ordered' else 'float_string' if field else 'ignore'
+ default = 'order' if type_ == 'ordered' else 'float_string' if field else 'ignore'
else:
- default = {'type': 'field', 'visible': True, 'order': [], 'not_in_list': (None,)}.get(key, None)
- propObj[key] = default
- return propObj[key]
+ default = {'type': 'field', 'visible': True, 'order': [], 'not_in_list': (None,)}.get(key)
+ prop_obj[key] = default
+ return prop_obj[key]
- def _resolve_field_value(self, field, value, convertNone=False):
+ def _resolve_field_value(self, field, value, convert_none=False):
if value is None:
- if not convertNone:
+ if not convert_none:
return None
else:
value = value.lower()
for item in sort_list:
match = re.match(self.regex, item)
if match is None:
- raise ExtractorError('Invalid format sort string "%s" given by extractor' % item)
+ raise ExtractorError(f'Invalid format sort string "{item}" given by extractor')
field = match.group('field')
if field is None:
continue
def print_verbose_info(self, write_debug):
if self._sort_user:
- write_debug('Sort order given by user: %s' % ', '.join(self._sort_user))
+ write_debug('Sort order given by user: {}'.format(', '.join(self._sort_user)))
if self._sort_extractor:
- write_debug('Sort order given by extractor: %s' % ', '.join(self._sort_extractor))
- write_debug('Formats sorted by: %s' % ', '.join(['%s%s%s' % (
+ write_debug('Sort order given by extractor: {}'.format(', '.join(self._sort_extractor)))
+ write_debug('Formats sorted by: {}'.format(', '.join(['{}{}{}'.format(
'+' if self._get_field_setting(field, 'reverse') else '', field,
- '%s%s(%s)' % ('~' if self._get_field_setting(field, 'closest') else ':',
- self._get_field_setting(field, 'limit_text'),
- self._get_field_setting(field, 'limit'))
+ '{}{}({})'.format('~' if self._get_field_setting(field, 'closest') else ':',
+ self._get_field_setting(field, 'limit_text'),
+ self._get_field_setting(field, 'limit'))
if self._get_field_setting(field, 'limit_text') is not None else '')
- for field in self._order if self._get_field_setting(field, 'visible')]))
+ for field in self._order if self._get_field_setting(field, 'visible')])))
- def _calculate_field_preference_from_value(self, format, field, type, value):
+ def _calculate_field_preference_from_value(self, format_, field, type_, value):
reverse = self._get_field_setting(field, 'reverse')
closest = self._get_field_setting(field, 'closest')
limit = self._get_field_setting(field, 'limit')
- if type == 'extractor':
+ if type_ == 'extractor':
maximum = self._get_field_setting(field, 'max')
if value is None or (maximum is not None and value >= maximum):
value = -1
- elif type == 'boolean':
+ elif type_ == 'boolean':
in_list = self._get_field_setting(field, 'in_list')
not_in_list = self._get_field_setting(field, 'not_in_list')
value = 0 if ((in_list is None or value in in_list) and (not_in_list is None or value not in not_in_list)) else -1
- elif type == 'ordered':
+ elif type_ == 'ordered':
value = self._resolve_field_value(field, value, True)
# try to convert to number
else (0, -value, 0) if limit is None or (reverse and value == limit) or value > limit
else (-1, value, 0))
- def _calculate_field_preference(self, format, field):
- type = self._get_field_setting(field, 'type') # extractor, boolean, ordered, field, multiple
- get_value = lambda f: format.get(self._get_field_setting(f, 'field'))
- if type == 'multiple':
- type = 'field' # Only 'field' is allowed in multiple for now
+ def _calculate_field_preference(self, format_, field):
+ type_ = self._get_field_setting(field, 'type') # extractor, boolean, ordered, field, multiple
+ get_value = lambda f: format_.get(self._get_field_setting(f, 'field'))
+ if type_ == 'multiple':
+ type_ = 'field' # Only 'field' is allowed in multiple for now
actual_fields = self._get_field_setting(field, 'field')
value = self._get_field_setting(field, 'function')(get_value(f) for f in actual_fields)
else:
value = get_value(field)
- return self._calculate_field_preference_from_value(format, field, type, value)
+ return self._calculate_field_preference_from_value(format_, field, type_, value)
def calculate_preference(self, format):
# Determine missing protocol
format['preference'] = -100
# Determine missing bitrates
- if format.get('tbr') is None:
- if format.get('vbr') is not None and format.get('abr') is not None:
- format['tbr'] = format.get('vbr', 0) + format.get('abr', 0)
- else:
- if format.get('vcodec') != 'none' and format.get('vbr') is None:
- format['vbr'] = format.get('tbr') - format.get('abr', 0)
- if format.get('acodec') != 'none' and format.get('abr') is None:
- format['abr'] = format.get('tbr') - format.get('vbr', 0)
+ if format.get('vcodec') == 'none':
+ format['vbr'] = 0
+ if format.get('acodec') == 'none':
+ format['abr'] = 0
+ if not format.get('vbr') and format.get('vcodec') != 'none':
+ format['vbr'] = try_call(lambda: format['tbr'] - format['abr']) or None
+ if not format.get('abr') and format.get('acodec') != 'none':
+ format['abr'] = try_call(lambda: format['tbr'] - format['vbr']) or None
+ if not format.get('tbr'):
+ format['tbr'] = try_call(lambda: format['vbr'] + format['abr']) or None
return tuple(self._calculate_field_preference(format, field) for field in self._order)
+
+
+def filesize_from_tbr(tbr, duration):
+ """
+ @param tbr: Total bitrate in kbps (1000 bits/sec)
+ @param duration: Duration in seconds
+ @returns Filesize in bytes
+ """
+ if tbr is None or duration is None:
+ return None
+ return int(duration * tbr * (1000 / 8))
+
+
+# XXX: Temporary
+class _YDLLogger:
+ def __init__(self, ydl=None):
+ self._ydl = ydl
+
+ def debug(self, message):
+ if self._ydl:
+ self._ydl.write_debug(message)
+
+ def info(self, message):
+ if self._ydl:
+ self._ydl.to_screen(message)
+
+ def warning(self, message, *, once=False):
+ if self._ydl:
+ self._ydl.report_warning(message, once)
+
+ def error(self, message, *, is_error=True):
+ if self._ydl:
+ self._ydl.report_error(message, is_error=is_error)
+
+ def stdout(self, message):
+ if self._ydl:
+ self._ydl.to_stdout(message)
+
+ def stderr(self, message):
+ if self._ydl:
+ self._ydl.to_stderr(message)