import locale
import math
import mimetypes
+import netrc
import operator
import os
import platform
'%d/%m/%y',
'%d/%m/%Y %H:%M:%S',
'%d-%m-%Y %H:%M',
+ '%H:%M %d/%m/%Y',
])
DATE_FORMATS_MONTH_FIRST = list(DATE_FORMATS)
)
-def process_communicate_or_kill(p, *args, **kwargs):
- deprecation_warning(f'"{__name__}.process_communicate_or_kill" is deprecated and may be removed '
- f'in a future version. Use "{__name__}.Popen.communicate_or_kill" instead')
- return Popen.communicate_or_kill(p, *args, **kwargs)
+class netrc_from_content(netrc.netrc):
+ def __init__(self, content):
+ self.hosts, self.macros = {}, {}
+ with io.StringIO(content) as stream:
+ self._parse('-', stream, False)
class Popen(subprocess.Popen):
return data
return brotli.decompress(data)
+ @staticmethod
+ def gz(data):
+ gz = gzip.GzipFile(fileobj=io.BytesIO(data), mode='rb')
+ try:
+ return gz.read()
+ except OSError as original_oserror:
+ # There may be junk add the end of the file
+ # See http://stackoverflow.com/q/4928560/35070 for details
+ for i in range(1, 1024):
+ try:
+ gz = gzip.GzipFile(fileobj=io.BytesIO(data[:-i]), mode='rb')
+ return gz.read()
+ except OSError:
+ continue
+ else:
+ raise original_oserror
+
def http_request(self, req):
# According to RFC 3986, URLs can not contain non-ASCII characters, however this is not
# always respected by websites, some tend to give out URLs with non percent-encoded
def http_response(self, req, resp):
old_resp = resp
- # gzip
- if resp.headers.get('Content-encoding', '') == 'gzip':
- content = resp.read()
- gz = gzip.GzipFile(fileobj=io.BytesIO(content), mode='rb')
- try:
- uncompressed = io.BytesIO(gz.read())
- except OSError as original_ioerror:
- # There may be junk add the end of the file
- # See http://stackoverflow.com/q/4928560/35070 for details
- for i in range(1, 1024):
- try:
- gz = gzip.GzipFile(fileobj=io.BytesIO(content[:-i]), mode='rb')
- uncompressed = io.BytesIO(gz.read())
- except OSError:
- continue
- break
- else:
- raise original_ioerror
- resp = urllib.request.addinfourl(uncompressed, old_resp.headers, old_resp.url, old_resp.code)
- resp.msg = old_resp.msg
- # deflate
- if resp.headers.get('Content-encoding', '') == 'deflate':
- gz = io.BytesIO(self.deflate(resp.read()))
- resp = urllib.request.addinfourl(gz, old_resp.headers, old_resp.url, old_resp.code)
- resp.msg = old_resp.msg
- # brotli
- if resp.headers.get('Content-encoding', '') == 'br':
- resp = urllib.request.addinfourl(
- io.BytesIO(self.brotli(resp.read())), old_resp.headers, old_resp.url, old_resp.code)
+
+ # Content-Encoding header lists the encodings in order that they were applied [1].
+ # To decompress, we simply do the reverse.
+ # [1]: https://datatracker.ietf.org/doc/html/rfc9110#name-content-encoding
+ decoded_response = None
+ for encoding in (e.strip() for e in reversed(resp.headers.get('Content-encoding', '').split(','))):
+ if encoding == 'gzip':
+ decoded_response = self.gz(decoded_response or resp.read())
+ elif encoding == 'deflate':
+ decoded_response = self.deflate(decoded_response or resp.read())
+ elif encoding == 'br' and brotli:
+ decoded_response = self.brotli(decoded_response or resp.read())
+
+ if decoded_response is not None:
+ resp = urllib.request.addinfourl(io.BytesIO(decoded_response), old_resp.headers, old_resp.url, old_resp.code)
resp.msg = old_resp.msg
# Percent-encode redirect URL of Location HTTP header to satisfy RFC 3986 (see
# https://github.com/ytdl-org/youtube-dl/issues/6457).
return isinstance(f, (str, bytes, os.PathLike))
-class YoutubeDLCookieJar(http.cookiejar.MozillaCookieJar):
- """
- See [1] for cookie file format.
-
- 1. https://curl.haxx.se/docs/http-cookies.html
- """
- _HTTPONLY_PREFIX = '#HttpOnly_'
- _ENTRY_LEN = 7
- _HEADER = '''# Netscape HTTP Cookie File
-# This file is generated by yt-dlp. Do not edit.
-
-'''
- _CookieFileEntry = collections.namedtuple(
- 'CookieFileEntry',
- ('domain_name', 'include_subdomains', 'path', 'https_only', 'expires_at', 'name', 'value'))
-
- def __init__(self, filename=None, *args, **kwargs):
- super().__init__(None, *args, **kwargs)
- if is_path_like(filename):
- filename = os.fspath(filename)
- self.filename = filename
-
- @staticmethod
- def _true_or_false(cndn):
- return 'TRUE' if cndn else 'FALSE'
-
- @contextlib.contextmanager
- def open(self, file, *, write=False):
- if is_path_like(file):
- with open(file, 'w' if write else 'r', encoding='utf-8') as f:
- yield f
- else:
- if write:
- file.truncate(0)
- yield file
-
- def _really_save(self, f, ignore_discard=False, ignore_expires=False):
- now = time.time()
- for cookie in self:
- if (not ignore_discard and cookie.discard
- or not ignore_expires and cookie.is_expired(now)):
- continue
- name, value = cookie.name, cookie.value
- if value is None:
- # cookies.txt regards 'Set-Cookie: foo' as a cookie
- # with no name, whereas http.cookiejar regards it as a
- # cookie with no value.
- name, value = '', name
- f.write('%s\n' % '\t'.join((
- cookie.domain,
- self._true_or_false(cookie.domain.startswith('.')),
- cookie.path,
- self._true_or_false(cookie.secure),
- str_or_none(cookie.expires, default=''),
- name, value
- )))
-
- def save(self, filename=None, *args, **kwargs):
- """
- Save cookies to a file.
- Code is taken from CPython 3.6
- https://github.com/python/cpython/blob/8d999cbf4adea053be6dbb612b9844635c4dfb8e/Lib/http/cookiejar.py#L2091-L2117 """
-
- if filename is None:
- if self.filename is not None:
- filename = self.filename
- else:
- raise ValueError(http.cookiejar.MISSING_FILENAME_TEXT)
-
- # Store session cookies with `expires` set to 0 instead of an empty string
- for cookie in self:
- if cookie.expires is None:
- cookie.expires = 0
-
- with self.open(filename, write=True) as f:
- f.write(self._HEADER)
- self._really_save(f, *args, **kwargs)
-
- def load(self, filename=None, ignore_discard=False, ignore_expires=False):
- """Load cookies from a file."""
- if filename is None:
- if self.filename is not None:
- filename = self.filename
- else:
- raise ValueError(http.cookiejar.MISSING_FILENAME_TEXT)
-
- def prepare_line(line):
- if line.startswith(self._HTTPONLY_PREFIX):
- line = line[len(self._HTTPONLY_PREFIX):]
- # comments and empty lines are fine
- if line.startswith('#') or not line.strip():
- return line
- cookie_list = line.split('\t')
- if len(cookie_list) != self._ENTRY_LEN:
- raise http.cookiejar.LoadError('invalid length %d' % len(cookie_list))
- cookie = self._CookieFileEntry(*cookie_list)
- if cookie.expires_at and not cookie.expires_at.isdigit():
- raise http.cookiejar.LoadError('invalid expires at %s' % cookie.expires_at)
- return line
-
- cf = io.StringIO()
- with self.open(filename) as f:
- for line in f:
- try:
- cf.write(prepare_line(line))
- except http.cookiejar.LoadError as e:
- if f'{line.strip()} '[0] in '[{"':
- raise http.cookiejar.LoadError(
- 'Cookies file must be Netscape formatted, not JSON. See '
- 'https://github.com/yt-dlp/yt-dlp/wiki/FAQ#how-do-i-pass-cookies-to-yt-dlp')
- write_string(f'WARNING: skipping cookie file entry due to {e}: {line!r}\n')
- continue
- cf.seek(0)
- self._really_load(cf, filename, ignore_discard, ignore_expires)
- # Session cookies are denoted by either `expires` field set to
- # an empty string or 0. MozillaCookieJar only recognizes the former
- # (see [1]). So we need force the latter to be recognized as session
- # cookies on our own.
- # Session cookies may be important for cookies-based authentication,
- # e.g. usually, when user does not check 'Remember me' check box while
- # logging in on a site, some important cookies are stored as session
- # cookies so that not recognizing them will result in failed login.
- # 1. https://bugs.python.org/issue17164
- for cookie in self:
- # Treat `expires=0` cookies as session cookies
- if cookie.expires == 0:
- cookie.expires = None
- cookie.discard = True
-
-
class YoutubeDLCookieProcessor(urllib.request.HTTPCookieProcessor):
def __init__(self, cookiejar=None):
urllib.request.HTTPCookieProcessor.__init__(self, cookiejar)
def unified_timestamp(date_str, day_first=True):
- if date_str is None:
+ if not isinstance(date_str, str):
return None
date_str = re.sub(r'\s+', ' ', re.sub(
return req
-def strftime_or_none(timestamp, date_format, default=None):
+def strftime_or_none(timestamp, date_format='%Y%m%d', default=None):
datetime_object = None
try:
if isinstance(timestamp, (int, float)): # unix timestamp
# Using naive datetime here can break timestamp() in Windows
# Ref: https://github.com/yt-dlp/yt-dlp/issues/5185, https://github.com/python/cpython/issues/94414
- datetime_object = datetime.datetime.fromtimestamp(timestamp, datetime.timezone.utc)
+ # Also, datetime.datetime.fromtimestamp breaks for negative timestamps
+ # Ref: https://github.com/yt-dlp/yt-dlp/issues/6706#issuecomment-1496842642
+ datetime_object = (datetime.datetime.fromtimestamp(0, datetime.timezone.utc)
+ + datetime.timedelta(seconds=timestamp))
elif isinstance(timestamp, str): # assume YYYYMMDD
datetime_object = datetime.datetime.strptime(timestamp, '%Y%m%d')
date_format = re.sub( # Support %s on windows
'''
-STR_FORMAT_TYPES = 'diouxXeEfFgGcrs'
+STR_FORMAT_TYPES = 'diouxXeEfFgGcrsa'
def limit_length(s, length):
},
}
- sanitize_codec = functools.partial(try_get, getter=lambda x: x[0].split('.')[0].replace('0', ''))
+ sanitize_codec = functools.partial(
+ try_get, getter=lambda x: x[0].split('.')[0].replace('0', '').lower())
vcodec, acodec = sanitize_codec(vcodecs), sanitize_codec(acodecs)
for ext in preferences or COMPATIBLE_CODECS.keys():
class download_range_func:
- def __init__(self, chapters, ranges):
- self.chapters, self.ranges = chapters, ranges
+ def __init__(self, chapters, ranges, from_info=False):
+ self.chapters, self.ranges, self.from_info = chapters, ranges, from_info
def __call__(self, info_dict, ydl):
- if not self.ranges and not self.chapters:
+ if not any((self.ranges, self.chapters, self.from_info)):
yield {}
warning = ('There are no chapters matching the regex' if info_dict.get('chapters')
if self.chapters and warning:
ydl.to_screen(f'[info] {info_dict["id"]}: {warning}')
- yield from ({'start_time': start, 'end_time': end} for start, end in self.ranges or [])
+ for start, end in self.ranges or []:
+ yield {
+ 'start_time': self._handle_negative_timestamp(start, info_dict),
+ 'end_time': self._handle_negative_timestamp(end, info_dict),
+ }
+
+ if self.from_info and (info_dict.get('start_time') or info_dict.get('end_time')):
+ yield {
+ 'start_time': info_dict.get('start_time'),
+ 'end_time': info_dict.get('end_time'),
+ }
+
+ @staticmethod
+ def _handle_negative_timestamp(time, info):
+ return max(info['duration'] + time, 0) if info.get('duration') and time < 0 else time
def __eq__(self, other):
return (isinstance(other, download_range_func)
'or': 'ori',
'os': 'oss',
'pa': 'pan',
+ 'pe': 'per',
'pi': 'pli',
'pl': 'pol',
'ps': 'pus',
return orderedSet(requested)
+# TODO: Rewrite
class FormatSorter:
regex = r' *((?P<reverse>\+)?(?P<field>[a-zA-Z0-9_]+)((?P<separator>[~:])(?P<limit>.*?))?)? *$'
'source': {'convert': 'float', 'field': 'source_preference', 'default': -1},
'codec': {'type': 'combined', 'field': ('vcodec', 'acodec')},
- 'br': {'type': 'combined', 'field': ('tbr', 'vbr', 'abr'), 'same_limit': True},
- 'size': {'type': 'combined', 'same_limit': True, 'field': ('filesize', 'fs_approx')},
+ 'br': {'type': 'multiple', 'field': ('tbr', 'vbr', 'abr'), 'convert': 'float_none',
+ 'function': lambda it: next(filter(None, it), None)},
+ 'size': {'type': 'multiple', 'field': ('filesize', 'fs_approx'), 'convert': 'bytes',
+ 'function': lambda it: next(filter(None, it), None)},
'ext': {'type': 'combined', 'field': ('vext', 'aext')},
'res': {'type': 'multiple', 'field': ('height', 'width'),
'function': lambda it: (lambda l: min(l) if l else 0)(tuple(filter(None, it)))},
format['preference'] = -100
# Determine missing bitrates
- if format.get('tbr') is None:
- if format.get('vbr') is not None and format.get('abr') is not None:
- format['tbr'] = format.get('vbr', 0) + format.get('abr', 0)
- else:
- if format.get('vcodec') != 'none' and format.get('vbr') is None:
- format['vbr'] = format.get('tbr') - format.get('abr', 0)
- if format.get('acodec') != 'none' and format.get('abr') is None:
- format['abr'] = format.get('tbr') - format.get('vbr', 0)
+ if format.get('vcodec') == 'none':
+ format['vbr'] = 0
+ if format.get('acodec') == 'none':
+ format['abr'] = 0
+ if not format.get('vbr') and format.get('vcodec') != 'none':
+ format['vbr'] = try_call(lambda: format['tbr'] - format['abr']) or None
+ if not format.get('abr') and format.get('acodec') != 'none':
+ format['abr'] = try_call(lambda: format['tbr'] - format['vbr']) or None
+ if not format.get('tbr'):
+ format['tbr'] = try_call(lambda: format['vbr'] + format['abr']) or None
return tuple(self._calculate_field_preference(format, field) for field in self._order)