import locale
import math
import mimetypes
+import netrc
import operator
import os
import platform
from ..dependencies import brotli, certifi, websockets, xattr
from ..socks import ProxyType, sockssocket
+__name__ = __name__.rsplit('.', 1)[0] # Pretend to be the parent module
+
# This is not clearly defined otherwise
compiled_regex_type = type(re.compile(''))
}
-NO_DEFAULT = object()
-IDENTITY = lambda x: x
+class NO_DEFAULT:
+ pass
+
+
+def IDENTITY(x):
+ return x
+
ENGLISH_MONTH_NAMES = [
'January', 'February', 'March', 'April', 'May', 'June',
'%d/%m/%y',
'%d/%m/%Y %H:%M:%S',
'%d-%m-%Y %H:%M',
+ '%H:%M %d/%m/%Y',
])
DATE_FORMATS_MONTH_FIRST = list(DATE_FORMATS)
)
-def process_communicate_or_kill(p, *args, **kwargs):
- deprecation_warning(f'"{__name__}.process_communicate_or_kill" is deprecated and may be removed '
- f'in a future version. Use "{__name__}.Popen.communicate_or_kill" instead')
- return Popen.communicate_or_kill(p, *args, **kwargs)
+class netrc_from_content(netrc.netrc):
+ def __init__(self, content):
+ self.hosts, self.macros = {}, {}
+ with io.StringIO(content) as stream:
+ self._parse('-', stream, False)
class Popen(subprocess.Popen):
return data
return brotli.decompress(data)
+ @staticmethod
+ def gz(data):
+ gz = gzip.GzipFile(fileobj=io.BytesIO(data), mode='rb')
+ try:
+ return gz.read()
+ except OSError as original_oserror:
+ # There may be junk add the end of the file
+ # See http://stackoverflow.com/q/4928560/35070 for details
+ for i in range(1, 1024):
+ try:
+ gz = gzip.GzipFile(fileobj=io.BytesIO(data[:-i]), mode='rb')
+ return gz.read()
+ except OSError:
+ continue
+ else:
+ raise original_oserror
+
def http_request(self, req):
# According to RFC 3986, URLs can not contain non-ASCII characters, however this is not
# always respected by websites, some tend to give out URLs with non percent-encoded
def http_response(self, req, resp):
old_resp = resp
- # gzip
- if resp.headers.get('Content-encoding', '') == 'gzip':
- content = resp.read()
- gz = gzip.GzipFile(fileobj=io.BytesIO(content), mode='rb')
- try:
- uncompressed = io.BytesIO(gz.read())
- except OSError as original_ioerror:
- # There may be junk add the end of the file
- # See http://stackoverflow.com/q/4928560/35070 for details
- for i in range(1, 1024):
- try:
- gz = gzip.GzipFile(fileobj=io.BytesIO(content[:-i]), mode='rb')
- uncompressed = io.BytesIO(gz.read())
- except OSError:
- continue
- break
- else:
- raise original_ioerror
- resp = urllib.request.addinfourl(uncompressed, old_resp.headers, old_resp.url, old_resp.code)
- resp.msg = old_resp.msg
- # deflate
- if resp.headers.get('Content-encoding', '') == 'deflate':
- gz = io.BytesIO(self.deflate(resp.read()))
- resp = urllib.request.addinfourl(gz, old_resp.headers, old_resp.url, old_resp.code)
- resp.msg = old_resp.msg
- # brotli
- if resp.headers.get('Content-encoding', '') == 'br':
- resp = urllib.request.addinfourl(
- io.BytesIO(self.brotli(resp.read())), old_resp.headers, old_resp.url, old_resp.code)
+
+ # Content-Encoding header lists the encodings in order that they were applied [1].
+ # To decompress, we simply do the reverse.
+ # [1]: https://datatracker.ietf.org/doc/html/rfc9110#name-content-encoding
+ decoded_response = None
+ for encoding in (e.strip() for e in reversed(resp.headers.get('Content-encoding', '').split(','))):
+ if encoding == 'gzip':
+ decoded_response = self.gz(decoded_response or resp.read())
+ elif encoding == 'deflate':
+ decoded_response = self.deflate(decoded_response or resp.read())
+ elif encoding == 'br' and brotli:
+ decoded_response = self.brotli(decoded_response or resp.read())
+
+ if decoded_response is not None:
+ resp = urllib.request.addinfourl(io.BytesIO(decoded_response), old_resp.headers, old_resp.url, old_resp.code)
resp.msg = old_resp.msg
# Percent-encode redirect URL of Location HTTP header to satisfy RFC 3986 (see
# https://github.com/ytdl-org/youtube-dl/issues/6457).
return isinstance(f, (str, bytes, os.PathLike))
-class YoutubeDLCookieJar(http.cookiejar.MozillaCookieJar):
- """
- See [1] for cookie file format.
-
- 1. https://curl.haxx.se/docs/http-cookies.html
- """
- _HTTPONLY_PREFIX = '#HttpOnly_'
- _ENTRY_LEN = 7
- _HEADER = '''# Netscape HTTP Cookie File
-# This file is generated by yt-dlp. Do not edit.
-
-'''
- _CookieFileEntry = collections.namedtuple(
- 'CookieFileEntry',
- ('domain_name', 'include_subdomains', 'path', 'https_only', 'expires_at', 'name', 'value'))
-
- def __init__(self, filename=None, *args, **kwargs):
- super().__init__(None, *args, **kwargs)
- if is_path_like(filename):
- filename = os.fspath(filename)
- self.filename = filename
-
- @staticmethod
- def _true_or_false(cndn):
- return 'TRUE' if cndn else 'FALSE'
-
- @contextlib.contextmanager
- def open(self, file, *, write=False):
- if is_path_like(file):
- with open(file, 'w' if write else 'r', encoding='utf-8') as f:
- yield f
- else:
- if write:
- file.truncate(0)
- yield file
-
- def _really_save(self, f, ignore_discard=False, ignore_expires=False):
- now = time.time()
- for cookie in self:
- if (not ignore_discard and cookie.discard
- or not ignore_expires and cookie.is_expired(now)):
- continue
- name, value = cookie.name, cookie.value
- if value is None:
- # cookies.txt regards 'Set-Cookie: foo' as a cookie
- # with no name, whereas http.cookiejar regards it as a
- # cookie with no value.
- name, value = '', name
- f.write('%s\n' % '\t'.join((
- cookie.domain,
- self._true_or_false(cookie.domain.startswith('.')),
- cookie.path,
- self._true_or_false(cookie.secure),
- str_or_none(cookie.expires, default=''),
- name, value
- )))
-
- def save(self, filename=None, *args, **kwargs):
- """
- Save cookies to a file.
- Code is taken from CPython 3.6
- https://github.com/python/cpython/blob/8d999cbf4adea053be6dbb612b9844635c4dfb8e/Lib/http/cookiejar.py#L2091-L2117 """
-
- if filename is None:
- if self.filename is not None:
- filename = self.filename
- else:
- raise ValueError(http.cookiejar.MISSING_FILENAME_TEXT)
-
- # Store session cookies with `expires` set to 0 instead of an empty string
- for cookie in self:
- if cookie.expires is None:
- cookie.expires = 0
-
- with self.open(filename, write=True) as f:
- f.write(self._HEADER)
- self._really_save(f, *args, **kwargs)
-
- def load(self, filename=None, ignore_discard=False, ignore_expires=False):
- """Load cookies from a file."""
- if filename is None:
- if self.filename is not None:
- filename = self.filename
- else:
- raise ValueError(http.cookiejar.MISSING_FILENAME_TEXT)
-
- def prepare_line(line):
- if line.startswith(self._HTTPONLY_PREFIX):
- line = line[len(self._HTTPONLY_PREFIX):]
- # comments and empty lines are fine
- if line.startswith('#') or not line.strip():
- return line
- cookie_list = line.split('\t')
- if len(cookie_list) != self._ENTRY_LEN:
- raise http.cookiejar.LoadError('invalid length %d' % len(cookie_list))
- cookie = self._CookieFileEntry(*cookie_list)
- if cookie.expires_at and not cookie.expires_at.isdigit():
- raise http.cookiejar.LoadError('invalid expires at %s' % cookie.expires_at)
- return line
-
- cf = io.StringIO()
- with self.open(filename) as f:
- for line in f:
- try:
- cf.write(prepare_line(line))
- except http.cookiejar.LoadError as e:
- if f'{line.strip()} '[0] in '[{"':
- raise http.cookiejar.LoadError(
- 'Cookies file must be Netscape formatted, not JSON. See '
- 'https://github.com/yt-dlp/yt-dlp/wiki/FAQ#how-do-i-pass-cookies-to-yt-dlp')
- write_string(f'WARNING: skipping cookie file entry due to {e}: {line!r}\n')
- continue
- cf.seek(0)
- self._really_load(cf, filename, ignore_discard, ignore_expires)
- # Session cookies are denoted by either `expires` field set to
- # an empty string or 0. MozillaCookieJar only recognizes the former
- # (see [1]). So we need force the latter to be recognized as session
- # cookies on our own.
- # Session cookies may be important for cookies-based authentication,
- # e.g. usually, when user does not check 'Remember me' check box while
- # logging in on a site, some important cookies are stored as session
- # cookies so that not recognizing them will result in failed login.
- # 1. https://bugs.python.org/issue17164
- for cookie in self:
- # Treat `expires=0` cookies as session cookies
- if cookie.expires == 0:
- cookie.expires = None
- cookie.discard = True
-
-
class YoutubeDLCookieProcessor(urllib.request.HTTPCookieProcessor):
def __init__(self, cookiejar=None):
urllib.request.HTTPCookieProcessor.__init__(self, cookiejar)
The code is based on HTTPRedirectHandler implementation from CPython [1].
- This redirect handler solves two issues:
- - ensures redirect URL is always unicode under python 2
- - introduces support for experimental HTTP response status code
- 308 Permanent Redirect [2] used by some sites [3]
+ This redirect handler fixes and improves the logic to better align with RFC7261
+ and what browsers tend to do [2][3]
1. https://github.com/python/cpython/blob/master/Lib/urllib/request.py
- 2. https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/308
- 3. https://github.com/ytdl-org/youtube-dl/issues/28768
+ 2. https://datatracker.ietf.org/doc/html/rfc7231
+ 3. https://github.com/python/cpython/issues/91306
"""
http_error_301 = http_error_303 = http_error_307 = http_error_308 = urllib.request.HTTPRedirectHandler.http_error_302
def redirect_request(self, req, fp, code, msg, headers, newurl):
- """Return a Request or None in response to a redirect.
-
- This is called by the http_error_30x methods when a
- redirection response is received. If a redirection should
- take place, return a new Request to allow http_error_30x to
- perform the redirect. Otherwise, raise HTTPError if no-one
- else should try to handle this url. Return None if you can't
- but another Handler might.
- """
- m = req.get_method()
- if (not (code in (301, 302, 303, 307, 308) and m in ("GET", "HEAD")
- or code in (301, 302, 303) and m == "POST")):
+ if code not in (301, 302, 303, 307, 308):
raise urllib.error.HTTPError(req.full_url, code, msg, headers, fp)
- # Strictly (according to RFC 2616), 301 or 302 in response to
- # a POST MUST NOT cause a redirection without confirmation
- # from the user (of urllib.request, in this case). In practice,
- # essentially all clients do redirect in this case, so we do
- # the same.
- # Be conciliant with URIs containing a space. This is mainly
- # redundant with the more complete encoding done in http_error_302(),
- # but it is kept for compatibility with other callers.
- newurl = newurl.replace(' ', '%20')
+ new_method = req.get_method()
+ new_data = req.data
- CONTENT_HEADERS = ("content-length", "content-type")
- # NB: don't use dict comprehension for python 2.6 compatibility
- newheaders = {k: v for k, v in req.headers.items() if k.lower() not in CONTENT_HEADERS}
+ # Technically the Cookie header should be in unredirected_hdrs,
+ # however in practice some may set it in normal headers anyway.
+ # We will remove it here to prevent any leaks.
+ remove_headers = ['Cookie']
# A 303 must either use GET or HEAD for subsequent request
# https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.4
- if code == 303 and m != 'HEAD':
- m = 'GET'
+ if code == 303 and req.get_method() != 'HEAD':
+ new_method = 'GET'
# 301 and 302 redirects are commonly turned into a GET from a POST
# for subsequent requests by browsers, so we'll do the same.
# https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.2
# https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.3
- if code in (301, 302) and m == 'POST':
- m = 'GET'
+ elif code in (301, 302) and req.get_method() == 'POST':
+ new_method = 'GET'
+
+ # only remove payload if method changed (e.g. POST to GET)
+ if new_method != req.get_method():
+ new_data = None
+ remove_headers.extend(['Content-Length', 'Content-Type'])
+
+ new_headers = {k: v for k, v in req.headers.items() if k.title() not in remove_headers}
return urllib.request.Request(
- newurl, headers=newheaders, origin_req_host=req.origin_req_host,
- unverifiable=True, method=m)
+ newurl, headers=new_headers, origin_req_host=req.origin_req_host,
+ unverifiable=True, method=new_method, data=new_data)
def extract_timezone(date_str):
def unified_timestamp(date_str, day_first=True):
- if date_str is None:
+ if not isinstance(date_str, str):
return None
date_str = re.sub(r'\s+', ' ', re.sub(
date = date_from_str(date)
return self.start <= date <= self.end
- def __str__(self):
- return f'{self.start.isoformat()} - {self.end.isoformat()}'
+ def __repr__(self):
+ return f'{__name__}.{type(self).__name__}({self.start.isoformat()!r}, {self.end.isoformat()!r})'
def __eq__(self, other):
return (isinstance(other, DateRange)
return req
-def strftime_or_none(timestamp, date_format, default=None):
+def strftime_or_none(timestamp, date_format='%Y%m%d', default=None):
datetime_object = None
try:
if isinstance(timestamp, (int, float)): # unix timestamp
# Using naive datetime here can break timestamp() in Windows
# Ref: https://github.com/yt-dlp/yt-dlp/issues/5185, https://github.com/python/cpython/issues/94414
- datetime_object = datetime.datetime.fromtimestamp(timestamp, datetime.timezone.utc)
+ # Also, datetime.datetime.fromtimestamp breaks for negative timestamps
+ # Ref: https://github.com/yt-dlp/yt-dlp/issues/6706#issuecomment-1496842642
+ datetime_object = (datetime.datetime.fromtimestamp(0, datetime.timezone.utc)
+ + datetime.timedelta(seconds=timestamp))
elif isinstance(timestamp, str): # assume YYYYMMDD
datetime_object = datetime.datetime.strptime(timestamp, '%Y%m%d')
date_format = re.sub( # Support %s on windows
def variadic(x, allowed_types=NO_DEFAULT):
+ if not isinstance(allowed_types, (tuple, type)):
+ deprecation_warning('allowed_types should be a tuple or a type')
+ allowed_types = tuple(allowed_types)
return x if is_iterable_like(x, blocked_types=allowed_types) else (x, )
'''
-STR_FORMAT_TYPES = 'diouxXeEfFgGcrs'
+STR_FORMAT_TYPES = 'diouxXeEfFgGcrsa'
def limit_length(s, length):
},
}
- sanitize_codec = functools.partial(try_get, getter=lambda x: x[0].split('.')[0].replace('0', ''))
+ sanitize_codec = functools.partial(
+ try_get, getter=lambda x: x[0].split('.')[0].replace('0', '').lower())
vcodec, acodec = sanitize_codec(vcodecs), sanitize_codec(acodecs)
for ext in preferences or COMPATIBLE_CODECS.keys():
class download_range_func:
- def __init__(self, chapters, ranges):
- self.chapters, self.ranges = chapters, ranges
+ def __init__(self, chapters, ranges, from_info=False):
+ self.chapters, self.ranges, self.from_info = chapters, ranges, from_info
def __call__(self, info_dict, ydl):
- if not self.ranges and not self.chapters:
- yield {}
warning = ('There are no chapters matching the regex' if info_dict.get('chapters')
else 'Cannot match chapters since chapter information is unavailable')
if self.chapters and warning:
ydl.to_screen(f'[info] {info_dict["id"]}: {warning}')
- yield from ({'start_time': start, 'end_time': end} for start, end in self.ranges or [])
+ for start, end in self.ranges or []:
+ yield {
+ 'start_time': self._handle_negative_timestamp(start, info_dict),
+ 'end_time': self._handle_negative_timestamp(end, info_dict),
+ }
+
+ if self.from_info and (info_dict.get('start_time') or info_dict.get('end_time')):
+ yield {
+ 'start_time': info_dict.get('start_time') or 0,
+ 'end_time': info_dict.get('end_time') or float('inf'),
+ }
+ elif not self.ranges and not self.chapters:
+ yield {}
+
+ @staticmethod
+ def _handle_negative_timestamp(time, info):
+ return max(info['duration'] + time, 0) if info.get('duration') and time < 0 else time
def __eq__(self, other):
return (isinstance(other, download_range_func)
'or': 'ori',
'os': 'oss',
'pa': 'pan',
+ 'pe': 'per',
'pi': 'pli',
'pl': 'pol',
'ps': 'pus',
def clean_podcast_url(url):
- return re.sub(r'''(?x)
+ url = re.sub(r'''(?x)
(?:
(?:
chtbl\.com/track|
media\.blubrry\.com| # https://create.blubrry.com/resources/podcast-media-download-statistics/getting-started/
- play\.podtrac\.com
- )/[^/]+|
+ play\.podtrac\.com|
+ chrt\.fm/track|
+ mgln\.ai/e
+ )(?:/[^/.]+)?|
(?:dts|www)\.podtrac\.com/(?:pts/)?redirect\.[0-9a-z]{3,4}| # http://analytics.podtrac.com/how-to-measure
flex\.acast\.com|
pd(?:
cn\.co| # https://podcorn.com/analytics-prefix/
st\.fm # https://podsights.com/docs/
- )/e
+ )/e|
+ [0-9]\.gum\.fm|
+ pscrb\.fm/rss/p
)/''', '', url)
+ return re.sub(r'^\w+://(\w+://)', r'\1', url)
_HEX_TABLE = '0123456789abcdef'
return orderedSet(requested)
+# TODO: Rewrite
class FormatSorter:
regex = r' *((?P<reverse>\+)?(?P<field>[a-zA-Z0-9_]+)((?P<separator>[~:])(?P<limit>.*?))?)? *$'
'source': {'convert': 'float', 'field': 'source_preference', 'default': -1},
'codec': {'type': 'combined', 'field': ('vcodec', 'acodec')},
- 'br': {'type': 'combined', 'field': ('tbr', 'vbr', 'abr'), 'same_limit': True},
- 'size': {'type': 'combined', 'same_limit': True, 'field': ('filesize', 'fs_approx')},
+ 'br': {'type': 'multiple', 'field': ('tbr', 'vbr', 'abr'), 'convert': 'float_none',
+ 'function': lambda it: next(filter(None, it), None)},
+ 'size': {'type': 'multiple', 'field': ('filesize', 'fs_approx'), 'convert': 'bytes',
+ 'function': lambda it: next(filter(None, it), None)},
'ext': {'type': 'combined', 'field': ('vext', 'aext')},
'res': {'type': 'multiple', 'field': ('height', 'width'),
'function': lambda it: (lambda l: min(l) if l else 0)(tuple(filter(None, it)))},
format['preference'] = -100
# Determine missing bitrates
- if format.get('tbr') is None:
- if format.get('vbr') is not None and format.get('abr') is not None:
- format['tbr'] = format.get('vbr', 0) + format.get('abr', 0)
- else:
- if format.get('vcodec') != 'none' and format.get('vbr') is None:
- format['vbr'] = format.get('tbr') - format.get('abr', 0)
- if format.get('acodec') != 'none' and format.get('abr') is None:
- format['abr'] = format.get('tbr') - format.get('vbr', 0)
+ if format.get('vcodec') == 'none':
+ format['vbr'] = 0
+ if format.get('acodec') == 'none':
+ format['abr'] = 0
+ if not format.get('vbr') and format.get('vcodec') != 'none':
+ format['vbr'] = try_call(lambda: format['tbr'] - format['abr']) or None
+ if not format.get('abr') and format.get('acodec') != 'none':
+ format['abr'] = try_call(lambda: format['tbr'] - format['vbr']) or None
+ if not format.get('tbr'):
+ format['tbr'] = try_call(lambda: format['vbr'] + format['abr']) or None
return tuple(self._calculate_field_preference(format, field) for field in self._order)
+
+
+# XXX: Temporary
+class _YDLLogger:
+ def __init__(self, ydl=None):
+ self._ydl = ydl
+
+ def debug(self, message):
+ if self._ydl:
+ self._ydl.write_debug(message)
+
+ def info(self, message):
+ if self._ydl:
+ self._ydl.to_screen(message)
+
+ def warning(self, message, *, once=False):
+ if self._ydl:
+ self._ydl.report_warning(message, only_once=once)
+
+ def error(self, message, *, is_error=True):
+ if self._ydl:
+ self._ydl.report_error(message, is_error=is_error)
+
+ def stdout(self, message):
+ if self._ydl:
+ self._ydl.to_stdout(message)
+
+ def stderr(self, message):
+ if self._ydl:
+ self._ydl.to_stderr(message)