import email.header
import email.utils
import errno
-import gzip
import hashlib
import hmac
import html.entities
import html.parser
-import http.client
-import http.cookiejar
import inspect
import io
import itertools
import locale
import math
import mimetypes
+import netrc
import operator
import os
import platform
import urllib.parse
import urllib.request
import xml.etree.ElementTree
-import zlib
from . import traversal
compat_os_name,
compat_shlex_quote,
)
-from ..dependencies import brotli, certifi, websockets, xattr
-from ..socks import ProxyType, sockssocket
+from ..dependencies import websockets, xattr
+
+__name__ = __name__.rsplit('.', 1)[0] # Pretend to be the parent module
# This is not clearly defined otherwise
compiled_regex_type = type(re.compile(''))
-def random_user_agent():
- _USER_AGENT_TPL = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/%s Safari/537.36'
- _CHROME_VERSIONS = (
- '90.0.4430.212',
- '90.0.4430.24',
- '90.0.4430.70',
- '90.0.4430.72',
- '90.0.4430.85',
- '90.0.4430.93',
- '91.0.4472.101',
- '91.0.4472.106',
- '91.0.4472.114',
- '91.0.4472.124',
- '91.0.4472.164',
- '91.0.4472.19',
- '91.0.4472.77',
- '92.0.4515.107',
- '92.0.4515.115',
- '92.0.4515.131',
- '92.0.4515.159',
- '92.0.4515.43',
- '93.0.4556.0',
- '93.0.4577.15',
- '93.0.4577.63',
- '93.0.4577.82',
- '94.0.4606.41',
- '94.0.4606.54',
- '94.0.4606.61',
- '94.0.4606.71',
- '94.0.4606.81',
- '94.0.4606.85',
- '95.0.4638.17',
- '95.0.4638.50',
- '95.0.4638.54',
- '95.0.4638.69',
- '95.0.4638.74',
- '96.0.4664.18',
- '96.0.4664.45',
- '96.0.4664.55',
- '96.0.4664.93',
- '97.0.4692.20',
- )
- return _USER_AGENT_TPL % random.choice(_CHROME_VERSIONS)
-
-
-SUPPORTED_ENCODINGS = [
- 'gzip', 'deflate'
-]
-if brotli:
- SUPPORTED_ENCODINGS.append('br')
-
-std_headers = {
- 'User-Agent': random_user_agent(),
- 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
- 'Accept-Language': 'en-us,en;q=0.5',
- 'Sec-Fetch-Mode': 'navigate',
-}
-
-
USER_AGENTS = {
'Safari': 'Mozilla/5.0 (X11; Linux x86_64; rv:10.0) AppleWebKit/533.20.25 (KHTML, like Gecko) Version/5.0.4 Safari/533.20.27',
}
-NO_DEFAULT = object()
-IDENTITY = lambda x: x
+class NO_DEFAULT:
+ pass
+
+
+def IDENTITY(x):
+ return x
+
ENGLISH_MONTH_NAMES = [
'January', 'February', 'March', 'April', 'May', 'June',
'%d/%m/%y',
'%d/%m/%Y %H:%M:%S',
'%d-%m-%Y %H:%M',
+ '%H:%M %d/%m/%Y',
])
DATE_FORMATS_MONTH_FIRST = list(DATE_FORMATS)
)
-def process_communicate_or_kill(p, *args, **kwargs):
- deprecation_warning(f'"{__name__}.process_communicate_or_kill" is deprecated and may be removed '
- f'in a future version. Use "{__name__}.Popen.communicate_or_kill" instead')
- return Popen.communicate_or_kill(p, *args, **kwargs)
+class netrc_from_content(netrc.netrc):
+ def __init__(self, content):
+ self.hosts, self.macros = {}, {}
+ with io.StringIO(content) as stream:
+ self._parse('-', stream, False)
class Popen(subprocess.Popen):
return '%s.%03d' % (ret, time.milliseconds) if msec else ret
-def _ssl_load_windows_store_certs(ssl_context, storename):
- # Code adapted from _load_windows_store_certs in https://github.com/python/cpython/blob/main/Lib/ssl.py
- try:
- certs = [cert for cert, encoding, trust in ssl.enum_certificates(storename)
- if encoding == 'x509_asn' and (
- trust is True or ssl.Purpose.SERVER_AUTH.oid in trust)]
- except PermissionError:
- return
- for cert in certs:
- with contextlib.suppress(ssl.SSLError):
- ssl_context.load_verify_locations(cadata=cert)
-
-
def make_HTTPS_handler(params, **kwargs):
- opts_check_certificate = not params.get('nocheckcertificate')
- context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
- context.check_hostname = opts_check_certificate
- if params.get('legacyserverconnect'):
- context.options |= 4 # SSL_OP_LEGACY_SERVER_CONNECT
- # Allow use of weaker ciphers in Python 3.10+. See https://bugs.python.org/issue43998
- context.set_ciphers('DEFAULT')
- elif (
- sys.version_info < (3, 10)
- and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1)
- and not ssl.OPENSSL_VERSION.startswith('LibreSSL')
- ):
- # Backport the default SSL ciphers and minimum TLS version settings from Python 3.10 [1].
- # This is to ensure consistent behavior across Python versions, and help avoid fingerprinting
- # in some situations [2][3].
- # Python 3.10 only supports OpenSSL 1.1.1+ [4]. Because this change is likely
- # untested on older versions, we only apply this to OpenSSL 1.1.1+ to be safe.
- # LibreSSL is excluded until further investigation due to cipher support issues [5][6].
- # 1. https://github.com/python/cpython/commit/e983252b516edb15d4338b0a47631b59ef1e2536
- # 2. https://github.com/yt-dlp/yt-dlp/issues/4627
- # 3. https://github.com/yt-dlp/yt-dlp/pull/5294
- # 4. https://peps.python.org/pep-0644/
- # 5. https://peps.python.org/pep-0644/#libressl-support
- # 6. https://github.com/yt-dlp/yt-dlp/commit/5b9f253fa0aee996cf1ed30185d4b502e00609c4#commitcomment-89054368
- context.set_ciphers('@SECLEVEL=2:ECDH+AESGCM:ECDH+CHACHA20:ECDH+AES:DHE+AES:!aNULL:!eNULL:!aDSS:!SHA1:!AESCCM')
- context.minimum_version = ssl.TLSVersion.TLSv1_2
-
- context.verify_mode = ssl.CERT_REQUIRED if opts_check_certificate else ssl.CERT_NONE
- if opts_check_certificate:
- if certifi and 'no-certifi' not in params.get('compat_opts', []):
- context.load_verify_locations(cafile=certifi.where())
- else:
- try:
- context.load_default_certs()
- # Work around the issue in load_default_certs when there are bad certificates. See:
- # https://github.com/yt-dlp/yt-dlp/issues/1060,
- # https://bugs.python.org/issue35665, https://bugs.python.org/issue45312
- except ssl.SSLError:
- # enum_certificates is not present in mingw python. See https://github.com/yt-dlp/yt-dlp/issues/1151
- if sys.platform == 'win32' and hasattr(ssl, 'enum_certificates'):
- for storename in ('CA', 'ROOT'):
- _ssl_load_windows_store_certs(context, storename)
- context.set_default_verify_paths()
-
- client_certfile = params.get('client_certificate')
- if client_certfile:
- try:
- context.load_cert_chain(
- client_certfile, keyfile=params.get('client_certificate_key'),
- password=params.get('client_certificate_password'))
- except ssl.SSLError:
- raise YoutubeDLError('Unable to load client certificate')
-
- # Some servers may reject requests if ALPN extension is not sent. See:
- # https://github.com/python/cpython/issues/85140
- # https://github.com/yt-dlp/yt-dlp/issues/3878
- with contextlib.suppress(NotImplementedError):
- context.set_alpn_protocols(['http/1.1'])
-
- return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
+ from ._deprecated import YoutubeDLHTTPSHandler
+ from ..networking._helper import make_ssl_context
+ return YoutubeDLHTTPSHandler(params, context=make_ssl_context(
+ verify=not params.get('nocheckcertificate'),
+ client_certificate=params.get('client_certificate'),
+ client_certificate_key=params.get('client_certificate_key'),
+ client_certificate_password=params.get('client_certificate_password'),
+ legacy_support=params.get('legacyserverconnect'),
+ use_certifi='no-certifi' not in params.get('compat_opts', []),
+ ), **kwargs)
def bug_reports_message(before=';'):
super().__init__(self.msg)
-network_exceptions = [urllib.error.URLError, http.client.HTTPException, socket.error]
-if hasattr(ssl, 'CertificateError'):
- network_exceptions.append(ssl.CertificateError)
-network_exceptions = tuple(network_exceptions)
-
-
class ExtractorError(YoutubeDLError):
"""Error during info extraction."""
""" tb, if given, is the original traceback (so that it can be printed out).
If expected is set, this is a normal error message and most likely not a bug in yt-dlp.
"""
+ from ..networking.exceptions import network_exceptions
if sys.exc_info()[0] in network_exceptions:
expected = True
pass
-def _create_http_connection(ydl_handler, http_class, is_https, *args, **kwargs):
- hc = http_class(*args, **kwargs)
- source_address = ydl_handler._params.get('source_address')
-
- if source_address is not None:
- # This is to workaround _create_connection() from socket where it will try all
- # address data from getaddrinfo() including IPv6. This filters the result from
- # getaddrinfo() based on the source_address value.
- # This is based on the cpython socket.create_connection() function.
- # https://github.com/python/cpython/blob/master/Lib/socket.py#L691
- def _create_connection(address, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, source_address=None):
- host, port = address
- err = None
- addrs = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)
- af = socket.AF_INET if '.' in source_address[0] else socket.AF_INET6
- ip_addrs = [addr for addr in addrs if addr[0] == af]
- if addrs and not ip_addrs:
- ip_version = 'v4' if af == socket.AF_INET else 'v6'
- raise OSError(
- "No remote IP%s addresses available for connect, can't use '%s' as source address"
- % (ip_version, source_address[0]))
- for res in ip_addrs:
- af, socktype, proto, canonname, sa = res
- sock = None
- try:
- sock = socket.socket(af, socktype, proto)
- if timeout is not socket._GLOBAL_DEFAULT_TIMEOUT:
- sock.settimeout(timeout)
- sock.bind(source_address)
- sock.connect(sa)
- err = None # Explicitly break reference cycle
- return sock
- except OSError as _:
- err = _
- if sock is not None:
- sock.close()
- if err is not None:
- raise err
- else:
- raise OSError('getaddrinfo returns an empty list')
- if hasattr(hc, '_create_connection'):
- hc._create_connection = _create_connection
- hc.source_address = (source_address, 0)
-
- return hc
-
-
-def handle_youtubedl_headers(headers):
- filtered_headers = headers
-
- if 'Youtubedl-no-compression' in filtered_headers:
- filtered_headers = {k: v for k, v in filtered_headers.items() if k.lower() != 'accept-encoding'}
- del filtered_headers['Youtubedl-no-compression']
-
- return filtered_headers
-
-
-class YoutubeDLHandler(urllib.request.HTTPHandler):
- """Handler for HTTP requests and responses.
-
- This class, when installed with an OpenerDirector, automatically adds
- the standard headers to every HTTP request and handles gzipped and
- deflated responses from web servers. If compression is to be avoided in
- a particular request, the original request in the program code only has
- to include the HTTP header "Youtubedl-no-compression", which will be
- removed before making the real request.
-
- Part of this code was copied from:
-
- http://techknack.net/python-urllib2-handlers/
-
- Andrew Rowls, the author of that code, agreed to release it to the
- public domain.
- """
-
- def __init__(self, params, *args, **kwargs):
- urllib.request.HTTPHandler.__init__(self, *args, **kwargs)
- self._params = params
-
- def http_open(self, req):
- conn_class = http.client.HTTPConnection
-
- socks_proxy = req.headers.get('Ytdl-socks-proxy')
- if socks_proxy:
- conn_class = make_socks_conn_class(conn_class, socks_proxy)
- del req.headers['Ytdl-socks-proxy']
-
- return self.do_open(functools.partial(
- _create_http_connection, self, conn_class, False),
- req)
-
- @staticmethod
- def deflate(data):
- if not data:
- return data
- try:
- return zlib.decompress(data, -zlib.MAX_WBITS)
- except zlib.error:
- return zlib.decompress(data)
-
- @staticmethod
- def brotli(data):
- if not data:
- return data
- return brotli.decompress(data)
-
- def http_request(self, req):
- # According to RFC 3986, URLs can not contain non-ASCII characters, however this is not
- # always respected by websites, some tend to give out URLs with non percent-encoded
- # non-ASCII characters (see telemb.py, ard.py [#3412])
- # urllib chokes on URLs with non-ASCII characters (see http://bugs.python.org/issue3991)
- # To work around aforementioned issue we will replace request's original URL with
- # percent-encoded one
- # Since redirects are also affected (e.g. http://www.southpark.de/alle-episoden/s18e09)
- # the code of this workaround has been moved here from YoutubeDL.urlopen()
- url = req.get_full_url()
- url_escaped = escape_url(url)
-
- # Substitute URL if any change after escaping
- if url != url_escaped:
- req = update_Request(req, url=url_escaped)
-
- for h, v in self._params.get('http_headers', std_headers).items():
- # Capitalize is needed because of Python bug 2275: http://bugs.python.org/issue2275
- # The dict keys are capitalized because of this bug by urllib
- if h.capitalize() not in req.headers:
- req.add_header(h, v)
-
- if 'Accept-encoding' not in req.headers:
- req.add_header('Accept-encoding', ', '.join(SUPPORTED_ENCODINGS))
-
- req.headers = handle_youtubedl_headers(req.headers)
-
- return super().do_request_(req)
-
- def http_response(self, req, resp):
- old_resp = resp
- # gzip
- if resp.headers.get('Content-encoding', '') == 'gzip':
- content = resp.read()
- gz = gzip.GzipFile(fileobj=io.BytesIO(content), mode='rb')
- try:
- uncompressed = io.BytesIO(gz.read())
- except OSError as original_ioerror:
- # There may be junk add the end of the file
- # See http://stackoverflow.com/q/4928560/35070 for details
- for i in range(1, 1024):
- try:
- gz = gzip.GzipFile(fileobj=io.BytesIO(content[:-i]), mode='rb')
- uncompressed = io.BytesIO(gz.read())
- except OSError:
- continue
- break
- else:
- raise original_ioerror
- resp = urllib.request.addinfourl(uncompressed, old_resp.headers, old_resp.url, old_resp.code)
- resp.msg = old_resp.msg
- # deflate
- if resp.headers.get('Content-encoding', '') == 'deflate':
- gz = io.BytesIO(self.deflate(resp.read()))
- resp = urllib.request.addinfourl(gz, old_resp.headers, old_resp.url, old_resp.code)
- resp.msg = old_resp.msg
- # brotli
- if resp.headers.get('Content-encoding', '') == 'br':
- resp = urllib.request.addinfourl(
- io.BytesIO(self.brotli(resp.read())), old_resp.headers, old_resp.url, old_resp.code)
- resp.msg = old_resp.msg
- # Percent-encode redirect URL of Location HTTP header to satisfy RFC 3986 (see
- # https://github.com/ytdl-org/youtube-dl/issues/6457).
- if 300 <= resp.code < 400:
- location = resp.headers.get('Location')
- if location:
- # As of RFC 2616 default charset is iso-8859-1 that is respected by python 3
- location = location.encode('iso-8859-1').decode()
- location_escaped = escape_url(location)
- if location != location_escaped:
- del resp.headers['Location']
- resp.headers['Location'] = location_escaped
- return resp
-
- https_request = http_request
- https_response = http_response
-
-
-def make_socks_conn_class(base_class, socks_proxy):
- assert issubclass(base_class, (
- http.client.HTTPConnection, http.client.HTTPSConnection))
-
- url_components = urllib.parse.urlparse(socks_proxy)
- if url_components.scheme.lower() == 'socks5':
- socks_type = ProxyType.SOCKS5
- elif url_components.scheme.lower() in ('socks', 'socks4'):
- socks_type = ProxyType.SOCKS4
- elif url_components.scheme.lower() == 'socks4a':
- socks_type = ProxyType.SOCKS4A
-
- def unquote_if_non_empty(s):
- if not s:
- return s
- return urllib.parse.unquote_plus(s)
-
- proxy_args = (
- socks_type,
- url_components.hostname, url_components.port or 1080,
- True, # Remote DNS
- unquote_if_non_empty(url_components.username),
- unquote_if_non_empty(url_components.password),
- )
-
- class SocksConnection(base_class):
- def connect(self):
- self.sock = sockssocket()
- self.sock.setproxy(*proxy_args)
- if isinstance(self.timeout, (int, float)):
- self.sock.settimeout(self.timeout)
- self.sock.connect((self.host, self.port))
-
- if isinstance(self, http.client.HTTPSConnection):
- if hasattr(self, '_context'): # Python > 2.6
- self.sock = self._context.wrap_socket(
- self.sock, server_hostname=self.host)
- else:
- self.sock = ssl.wrap_socket(self.sock)
-
- return SocksConnection
-
-
-class YoutubeDLHTTPSHandler(urllib.request.HTTPSHandler):
- def __init__(self, params, https_conn_class=None, *args, **kwargs):
- urllib.request.HTTPSHandler.__init__(self, *args, **kwargs)
- self._https_conn_class = https_conn_class or http.client.HTTPSConnection
- self._params = params
-
- def https_open(self, req):
- kwargs = {}
- conn_class = self._https_conn_class
-
- if hasattr(self, '_context'): # python > 2.6
- kwargs['context'] = self._context
- if hasattr(self, '_check_hostname'): # python 3.x
- kwargs['check_hostname'] = self._check_hostname
-
- socks_proxy = req.headers.get('Ytdl-socks-proxy')
- if socks_proxy:
- conn_class = make_socks_conn_class(conn_class, socks_proxy)
- del req.headers['Ytdl-socks-proxy']
-
- try:
- return self.do_open(
- functools.partial(_create_http_connection, self, conn_class, True), req, **kwargs)
- except urllib.error.URLError as e:
- if (isinstance(e.reason, ssl.SSLError)
- and getattr(e.reason, 'reason', None) == 'SSLV3_ALERT_HANDSHAKE_FAILURE'):
- raise YoutubeDLError('SSLV3_ALERT_HANDSHAKE_FAILURE: Try using --legacy-server-connect')
- raise
-
-
def is_path_like(f):
return isinstance(f, (str, bytes, os.PathLike))
-class YoutubeDLCookieJar(http.cookiejar.MozillaCookieJar):
- """
- See [1] for cookie file format.
-
- 1. https://curl.haxx.se/docs/http-cookies.html
- """
- _HTTPONLY_PREFIX = '#HttpOnly_'
- _ENTRY_LEN = 7
- _HEADER = '''# Netscape HTTP Cookie File
-# This file is generated by yt-dlp. Do not edit.
-
-'''
- _CookieFileEntry = collections.namedtuple(
- 'CookieFileEntry',
- ('domain_name', 'include_subdomains', 'path', 'https_only', 'expires_at', 'name', 'value'))
-
- def __init__(self, filename=None, *args, **kwargs):
- super().__init__(None, *args, **kwargs)
- if is_path_like(filename):
- filename = os.fspath(filename)
- self.filename = filename
-
- @staticmethod
- def _true_or_false(cndn):
- return 'TRUE' if cndn else 'FALSE'
-
- @contextlib.contextmanager
- def open(self, file, *, write=False):
- if is_path_like(file):
- with open(file, 'w' if write else 'r', encoding='utf-8') as f:
- yield f
- else:
- if write:
- file.truncate(0)
- yield file
-
- def _really_save(self, f, ignore_discard=False, ignore_expires=False):
- now = time.time()
- for cookie in self:
- if (not ignore_discard and cookie.discard
- or not ignore_expires and cookie.is_expired(now)):
- continue
- name, value = cookie.name, cookie.value
- if value is None:
- # cookies.txt regards 'Set-Cookie: foo' as a cookie
- # with no name, whereas http.cookiejar regards it as a
- # cookie with no value.
- name, value = '', name
- f.write('%s\n' % '\t'.join((
- cookie.domain,
- self._true_or_false(cookie.domain.startswith('.')),
- cookie.path,
- self._true_or_false(cookie.secure),
- str_or_none(cookie.expires, default=''),
- name, value
- )))
-
- def save(self, filename=None, *args, **kwargs):
- """
- Save cookies to a file.
- Code is taken from CPython 3.6
- https://github.com/python/cpython/blob/8d999cbf4adea053be6dbb612b9844635c4dfb8e/Lib/http/cookiejar.py#L2091-L2117 """
-
- if filename is None:
- if self.filename is not None:
- filename = self.filename
- else:
- raise ValueError(http.cookiejar.MISSING_FILENAME_TEXT)
-
- # Store session cookies with `expires` set to 0 instead of an empty string
- for cookie in self:
- if cookie.expires is None:
- cookie.expires = 0
-
- with self.open(filename, write=True) as f:
- f.write(self._HEADER)
- self._really_save(f, *args, **kwargs)
-
- def load(self, filename=None, ignore_discard=False, ignore_expires=False):
- """Load cookies from a file."""
- if filename is None:
- if self.filename is not None:
- filename = self.filename
- else:
- raise ValueError(http.cookiejar.MISSING_FILENAME_TEXT)
-
- def prepare_line(line):
- if line.startswith(self._HTTPONLY_PREFIX):
- line = line[len(self._HTTPONLY_PREFIX):]
- # comments and empty lines are fine
- if line.startswith('#') or not line.strip():
- return line
- cookie_list = line.split('\t')
- if len(cookie_list) != self._ENTRY_LEN:
- raise http.cookiejar.LoadError('invalid length %d' % len(cookie_list))
- cookie = self._CookieFileEntry(*cookie_list)
- if cookie.expires_at and not cookie.expires_at.isdigit():
- raise http.cookiejar.LoadError('invalid expires at %s' % cookie.expires_at)
- return line
-
- cf = io.StringIO()
- with self.open(filename) as f:
- for line in f:
- try:
- cf.write(prepare_line(line))
- except http.cookiejar.LoadError as e:
- if f'{line.strip()} '[0] in '[{"':
- raise http.cookiejar.LoadError(
- 'Cookies file must be Netscape formatted, not JSON. See '
- 'https://github.com/yt-dlp/yt-dlp/wiki/FAQ#how-do-i-pass-cookies-to-yt-dlp')
- write_string(f'WARNING: skipping cookie file entry due to {e}: {line!r}\n')
- continue
- cf.seek(0)
- self._really_load(cf, filename, ignore_discard, ignore_expires)
- # Session cookies are denoted by either `expires` field set to
- # an empty string or 0. MozillaCookieJar only recognizes the former
- # (see [1]). So we need force the latter to be recognized as session
- # cookies on our own.
- # Session cookies may be important for cookies-based authentication,
- # e.g. usually, when user does not check 'Remember me' check box while
- # logging in on a site, some important cookies are stored as session
- # cookies so that not recognizing them will result in failed login.
- # 1. https://bugs.python.org/issue17164
- for cookie in self:
- # Treat `expires=0` cookies as session cookies
- if cookie.expires == 0:
- cookie.expires = None
- cookie.discard = True
-
-
class YoutubeDLCookieProcessor(urllib.request.HTTPCookieProcessor):
def __init__(self, cookiejar=None):
urllib.request.HTTPCookieProcessor.__init__(self, cookiejar)
https_response = http_response
-class YoutubeDLRedirectHandler(urllib.request.HTTPRedirectHandler):
- """YoutubeDL redirect handler
-
- The code is based on HTTPRedirectHandler implementation from CPython [1].
-
- This redirect handler solves two issues:
- - ensures redirect URL is always unicode under python 2
- - introduces support for experimental HTTP response status code
- 308 Permanent Redirect [2] used by some sites [3]
-
- 1. https://github.com/python/cpython/blob/master/Lib/urllib/request.py
- 2. https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/308
- 3. https://github.com/ytdl-org/youtube-dl/issues/28768
- """
-
- http_error_301 = http_error_303 = http_error_307 = http_error_308 = urllib.request.HTTPRedirectHandler.http_error_302
-
- def redirect_request(self, req, fp, code, msg, headers, newurl):
- """Return a Request or None in response to a redirect.
-
- This is called by the http_error_30x methods when a
- redirection response is received. If a redirection should
- take place, return a new Request to allow http_error_30x to
- perform the redirect. Otherwise, raise HTTPError if no-one
- else should try to handle this url. Return None if you can't
- but another Handler might.
- """
- m = req.get_method()
- if (not (code in (301, 302, 303, 307, 308) and m in ("GET", "HEAD")
- or code in (301, 302, 303) and m == "POST")):
- raise urllib.error.HTTPError(req.full_url, code, msg, headers, fp)
- # Strictly (according to RFC 2616), 301 or 302 in response to
- # a POST MUST NOT cause a redirection without confirmation
- # from the user (of urllib.request, in this case). In practice,
- # essentially all clients do redirect in this case, so we do
- # the same.
-
- # Be conciliant with URIs containing a space. This is mainly
- # redundant with the more complete encoding done in http_error_302(),
- # but it is kept for compatibility with other callers.
- newurl = newurl.replace(' ', '%20')
-
- CONTENT_HEADERS = ("content-length", "content-type")
- # NB: don't use dict comprehension for python 2.6 compatibility
- newheaders = {k: v for k, v in req.headers.items() if k.lower() not in CONTENT_HEADERS}
-
- # A 303 must either use GET or HEAD for subsequent request
- # https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.4
- if code == 303 and m != 'HEAD':
- m = 'GET'
- # 301 and 302 redirects are commonly turned into a GET from a POST
- # for subsequent requests by browsers, so we'll do the same.
- # https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.2
- # https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.3
- if code in (301, 302) and m == 'POST':
- m = 'GET'
-
- return urllib.request.Request(
- newurl, headers=newheaders, origin_req_host=req.origin_req_host,
- unverifiable=True, method=m)
-
-
def extract_timezone(date_str):
m = re.search(
r'''(?x)
def unified_timestamp(date_str, day_first=True):
- if date_str is None:
+ if not isinstance(date_str, str):
return None
date_str = re.sub(r'\s+', ' ', re.sub(
date = date_from_str(date)
return self.start <= date <= self.end
- def __str__(self):
- return f'{self.start.isoformat()} - {self.end.isoformat()}'
+ def __repr__(self):
+ return f'{__name__}.{type(self).__name__}({self.start.isoformat()!r}, {self.end.isoformat()!r})'
def __eq__(self, other):
return (isinstance(other, DateRange)
return urllib.parse.urljoin(base, path)
-class HEADRequest(urllib.request.Request):
- def get_method(self):
- return 'HEAD'
-
-
-class PUTRequest(urllib.request.Request):
- def get_method(self):
- return 'PUT'
-
-
def int_or_none(v, scale=1, default=None, get_attr=None, invscale=1):
if get_attr and v is not None:
v = getattr(v, get_attr, None)
return req
-def strftime_or_none(timestamp, date_format, default=None):
+def strftime_or_none(timestamp, date_format='%Y%m%d', default=None):
datetime_object = None
try:
if isinstance(timestamp, (int, float)): # unix timestamp
# Using naive datetime here can break timestamp() in Windows
# Ref: https://github.com/yt-dlp/yt-dlp/issues/5185, https://github.com/python/cpython/issues/94414
- datetime_object = datetime.datetime.fromtimestamp(timestamp, datetime.timezone.utc)
+ # Also, datetime.datetime.fromtimestamp breaks for negative timestamps
+ # Ref: https://github.com/yt-dlp/yt-dlp/issues/6706#issuecomment-1496842642
+ datetime_object = (datetime.datetime.fromtimestamp(0, datetime.timezone.utc)
+ + datetime.timedelta(seconds=timestamp))
elif isinstance(timestamp, str): # assume YYYYMMDD
datetime_object = datetime.datetime.strptime(timestamp, '%Y%m%d')
date_format = re.sub( # Support %s on windows
return update_url(url, query_update=query)
-def update_Request(req, url=None, data=None, headers=None, query=None):
- req_headers = req.headers.copy()
- req_headers.update(headers or {})
- req_data = data or req.data
- req_url = update_url_query(url or req.get_full_url(), query)
- req_get_method = req.get_method()
- if req_get_method == 'HEAD':
- req_type = HEADRequest
- elif req_get_method == 'PUT':
- req_type = PUTRequest
- else:
- req_type = urllib.request.Request
- new_req = req_type(
- req_url, data=req_data, headers=req_headers,
- origin_req_host=req.origin_req_host, unverifiable=req.unverifiable)
- if hasattr(req, 'timeout'):
- new_req.timeout = req.timeout
- return new_req
-
-
def _multipart_encode_impl(data, boundary):
content_type = 'multipart/form-data; boundary=%s' % boundary
def variadic(x, allowed_types=NO_DEFAULT):
+ if not isinstance(allowed_types, (tuple, type)):
+ deprecation_warning('allowed_types should be a tuple or a type')
+ allowed_types = tuple(allowed_types)
return x if is_iterable_like(x, blocked_types=allowed_types) else (x, )
'''
-STR_FORMAT_TYPES = 'diouxXeEfFgGcrs'
+STR_FORMAT_TYPES = 'diouxXeEfFgGcrsa'
def limit_length(s, length):
},
}
- sanitize_codec = functools.partial(try_get, getter=lambda x: x[0].split('.')[0].replace('0', ''))
+ sanitize_codec = functools.partial(
+ try_get, getter=lambda x: x[0].split('.')[0].replace('0', '').lower())
vcodec, acodec = sanitize_codec(vcodecs), sanitize_codec(acodecs)
for ext in preferences or COMPATIBLE_CODECS.keys():
class download_range_func:
- def __init__(self, chapters, ranges):
- self.chapters, self.ranges = chapters, ranges
+ def __init__(self, chapters, ranges, from_info=False):
+ self.chapters, self.ranges, self.from_info = chapters, ranges, from_info
def __call__(self, info_dict, ydl):
- if not self.ranges and not self.chapters:
- yield {}
warning = ('There are no chapters matching the regex' if info_dict.get('chapters')
else 'Cannot match chapters since chapter information is unavailable')
if self.chapters and warning:
ydl.to_screen(f'[info] {info_dict["id"]}: {warning}')
- yield from ({'start_time': start, 'end_time': end} for start, end in self.ranges or [])
+ for start, end in self.ranges or []:
+ yield {
+ 'start_time': self._handle_negative_timestamp(start, info_dict),
+ 'end_time': self._handle_negative_timestamp(end, info_dict),
+ }
+
+ if self.from_info and (info_dict.get('start_time') or info_dict.get('end_time')):
+ yield {
+ 'start_time': info_dict.get('start_time') or 0,
+ 'end_time': info_dict.get('end_time') or float('inf'),
+ }
+ elif not self.ranges and not self.chapters:
+ yield {}
+
+ @staticmethod
+ def _handle_negative_timestamp(time, info):
+ return max(info['duration'] + time, 0) if info.get('duration') and time < 0 else time
def __eq__(self, other):
return (isinstance(other, download_range_func)
'or': 'ori',
'os': 'oss',
'pa': 'pan',
+ 'pe': 'per',
'pi': 'pli',
'pl': 'pol',
'ps': 'pus',
struct.pack('!L', random.randint(addr_min, addr_max))))
-class PerRequestProxyHandler(urllib.request.ProxyHandler):
- def __init__(self, proxies=None):
- # Set default handlers
- for type in ('http', 'https'):
- setattr(self, '%s_open' % type,
- lambda r, proxy='__noproxy__', type=type, meth=self.proxy_open:
- meth(r, proxy, type))
- urllib.request.ProxyHandler.__init__(self, proxies)
-
- def proxy_open(self, req, proxy, type):
- req_proxy = req.headers.get('Ytdl-request-proxy')
- if req_proxy is not None:
- proxy = req_proxy
- del req.headers['Ytdl-request-proxy']
-
- if proxy == '__noproxy__':
- return None # No Proxy
- if urllib.parse.urlparse(proxy).scheme.lower() in ('socks', 'socks4', 'socks4a', 'socks5'):
- req.add_header('Ytdl-socks-proxy', proxy)
- # yt-dlp's http/https handlers do wrapping the socket with socks
- return None
- return urllib.request.ProxyHandler.proxy_open(
- self, req, proxy, type)
-
-
# Both long_to_bytes and bytes_to_long are adapted from PyCrypto, which is
# released into Public Domain
# https://github.com/dlitz/pycrypto/blob/master/lib/Crypto/Util/number.py#L387
def clean_podcast_url(url):
- return re.sub(r'''(?x)
+ url = re.sub(r'''(?x)
(?:
(?:
chtbl\.com/track|
media\.blubrry\.com| # https://create.blubrry.com/resources/podcast-media-download-statistics/getting-started/
- play\.podtrac\.com
- )/[^/]+|
+ play\.podtrac\.com|
+ chrt\.fm/track|
+ mgln\.ai/e
+ )(?:/[^/.]+)?|
(?:dts|www)\.podtrac\.com/(?:pts/)?redirect\.[0-9a-z]{3,4}| # http://analytics.podtrac.com/how-to-measure
flex\.acast\.com|
pd(?:
cn\.co| # https://podcorn.com/analytics-prefix/
st\.fm # https://podsights.com/docs/
- )/e
+ )/e|
+ [0-9]\.gum\.fm|
+ pscrb\.fm/rss/p
)/''', '', url)
+ return re.sub(r'^\w+://(\w+://)', r'\1', url)
_HEX_TABLE = '0123456789abcdef'
return orderedSet(requested)
+# TODO: Rewrite
class FormatSorter:
regex = r' *((?P<reverse>\+)?(?P<field>[a-zA-Z0-9_]+)((?P<separator>[~:])(?P<limit>.*?))?)? *$'
'source': {'convert': 'float', 'field': 'source_preference', 'default': -1},
'codec': {'type': 'combined', 'field': ('vcodec', 'acodec')},
- 'br': {'type': 'combined', 'field': ('tbr', 'vbr', 'abr'), 'same_limit': True},
- 'size': {'type': 'combined', 'same_limit': True, 'field': ('filesize', 'fs_approx')},
+ 'br': {'type': 'multiple', 'field': ('tbr', 'vbr', 'abr'), 'convert': 'float_none',
+ 'function': lambda it: next(filter(None, it), None)},
+ 'size': {'type': 'multiple', 'field': ('filesize', 'fs_approx'), 'convert': 'bytes',
+ 'function': lambda it: next(filter(None, it), None)},
'ext': {'type': 'combined', 'field': ('vext', 'aext')},
'res': {'type': 'multiple', 'field': ('height', 'width'),
'function': lambda it: (lambda l: min(l) if l else 0)(tuple(filter(None, it)))},
format['preference'] = -100
# Determine missing bitrates
- if format.get('tbr') is None:
- if format.get('vbr') is not None and format.get('abr') is not None:
- format['tbr'] = format.get('vbr', 0) + format.get('abr', 0)
- else:
- if format.get('vcodec') != 'none' and format.get('vbr') is None:
- format['vbr'] = format.get('tbr') - format.get('abr', 0)
- if format.get('acodec') != 'none' and format.get('abr') is None:
- format['abr'] = format.get('tbr') - format.get('vbr', 0)
+ if format.get('vcodec') == 'none':
+ format['vbr'] = 0
+ if format.get('acodec') == 'none':
+ format['abr'] = 0
+ if not format.get('vbr') and format.get('vcodec') != 'none':
+ format['vbr'] = try_call(lambda: format['tbr'] - format['abr']) or None
+ if not format.get('abr') and format.get('acodec') != 'none':
+ format['abr'] = try_call(lambda: format['tbr'] - format['vbr']) or None
+ if not format.get('tbr'):
+ format['tbr'] = try_call(lambda: format['vbr'] + format['abr']) or None
return tuple(self._calculate_field_preference(format, field) for field in self._order)
+
+
+# XXX: Temporary
+class _YDLLogger:
+ def __init__(self, ydl=None):
+ self._ydl = ydl
+
+ def debug(self, message):
+ if self._ydl:
+ self._ydl.write_debug(message)
+
+ def info(self, message):
+ if self._ydl:
+ self._ydl.to_screen(message)
+
+ def warning(self, message, *, once=False):
+ if self._ydl:
+ self._ydl.report_warning(message, only_once=once)
+
+ def error(self, message, *, is_error=True):
+ if self._ydl:
+ self._ydl.report_error(message, is_error=is_error)
+
+ def stdout(self, message):
+ if self._ydl:
+ self._ydl.to_stdout(message)
+
+ def stderr(self, message):
+ if self._ydl:
+ self._ydl.to_stderr(message)