from __future__ import unicode_literals
+import asyncio
+import atexit
import base64
import binascii
import calendar
compat_HTMLParser,
compat_HTTPError,
compat_basestring,
+ compat_brotli,
compat_chr,
compat_cookiejar,
compat_ctypes_WINFUNCTYPE,
compat_kwargs,
compat_os_name,
compat_parse_qs,
+ compat_shlex_split,
compat_shlex_quote,
compat_str,
compat_struct_pack,
compat_urllib_parse_unquote_plus,
compat_urllib_request,
compat_urlparse,
+ compat_websockets,
compat_xpath,
)
return _USER_AGENT_TPL % random.choice(_CHROME_VERSIONS)
+SUPPORTED_ENCODINGS = [
+ 'gzip', 'deflate'
+]
+if compat_brotli:
+ SUPPORTED_ENCODINGS.append('br')
+
std_headers = {
'User-Agent': random_user_agent(),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
- 'Accept-Encoding': 'gzip, deflate',
+ 'Accept-Encoding': ', '.join(SUPPORTED_ENCODINGS),
'Accept-Language': 'en-us,en;q=0.5',
+ 'Sec-Fetch-Mode': 'navigate',
}
'%Y/%m/%d %H:%M:%S',
'%Y%m%d%H%M',
'%Y%m%d%H%M%S',
+ '%Y%m%d',
'%Y-%m-%d %H:%M',
'%Y-%m-%d %H:%M:%S',
'%Y-%m-%d %H:%M:%S.%f',
try:
with tf:
- json.dump(obj, tf)
+ json.dump(obj, tf, ensure_ascii=False)
if sys.platform == 'win32':
# Need to remove existing file on Windows, else os.rename raises
# WindowsError or FileExistsError.
return get_element_by_attribute('id', id, html)
+def get_element_html_by_id(id, html):
+ """Return the html of the tag with the specified ID in the passed HTML document"""
+ return get_element_html_by_attribute('id', id, html)
+
+
def get_element_by_class(class_name, html):
"""Return the content of the first tag with the specified class in the passed HTML document"""
retval = get_elements_by_class(class_name, html)
return retval[0] if retval else None
+def get_element_html_by_class(class_name, html):
+ """Return the html of the first tag with the specified class in the passed HTML document"""
+ retval = get_elements_html_by_class(class_name, html)
+ return retval[0] if retval else None
+
+
def get_element_by_attribute(attribute, value, html, escape_value=True):
retval = get_elements_by_attribute(attribute, value, html, escape_value)
return retval[0] if retval else None
+def get_element_html_by_attribute(attribute, value, html, escape_value=True):
+ retval = get_elements_html_by_attribute(attribute, value, html, escape_value)
+ return retval[0] if retval else None
+
+
def get_elements_by_class(class_name, html):
"""Return the content of all tags with the specified class in the passed HTML document as a list"""
return get_elements_by_attribute(
html, escape_value=False)
-def get_elements_by_attribute(attribute, value, html, escape_value=True):
+def get_elements_html_by_class(class_name, html):
+ """Return the html of all tags with the specified class in the passed HTML document as a list"""
+ return get_elements_html_by_attribute(
+ 'class', r'[^\'"]*\b%s\b[^\'"]*' % re.escape(class_name),
+ html, escape_value=False)
+
+
+def get_elements_by_attribute(*args, **kwargs):
"""Return the content of the tag with the specified attribute in the passed HTML document"""
+ return [content for content, _ in get_elements_text_and_html_by_attribute(*args, **kwargs)]
+
+
+def get_elements_html_by_attribute(*args, **kwargs):
+ """Return the html of the tag with the specified attribute in the passed HTML document"""
+ return [whole for _, whole in get_elements_text_and_html_by_attribute(*args, **kwargs)]
+
+
+def get_elements_text_and_html_by_attribute(attribute, value, html, escape_value=True):
+ """
+ Return the text (content) and the html (whole) of the tag with the specified
+ attribute in the passed HTML document
+ """
+
+ value_quote_optional = '' if re.match(r'''[\s"'`=<>]''', value) else '?'
value = re.escape(value) if escape_value else value
- retlist = []
- for m in re.finditer(r'''(?xs)
- <([a-zA-Z0-9:._-]+)
- (?:\s+[a-zA-Z0-9:._-]+(?:=[a-zA-Z0-9:._-]*|="[^"]*"|='[^']*'|))*?
- \s+%s=['"]?%s['"]?
- (?:\s+[a-zA-Z0-9:._-]+(?:=[a-zA-Z0-9:._-]*|="[^"]*"|='[^']*'|))*?
- \s*>
- (?P<content>.*?)
- </\1>
- ''' % (re.escape(attribute), value), html):
- res = m.group('content')
+ partial_element_re = r'''(?x)
+ <(?P<tag>[a-zA-Z0-9:._-]+)
+ (?:\s(?:[^>"']|"[^"]*"|'[^']*')*)?
+ \s%(attribute)s\s*=\s*(?P<_q>['"]%(vqo)s)(?-x:%(value)s)(?P=_q)
+ ''' % {'attribute': re.escape(attribute), 'value': value, 'vqo': value_quote_optional}
- if res.startswith('"') or res.startswith("'"):
- res = res[1:-1]
+ for m in re.finditer(partial_element_re, html):
+ content, whole = get_element_text_and_html_by_tag(m.group('tag'), html[m.start():])
- retlist.append(unescapeHTML(res))
+ yield (
+ unescapeHTML(re.sub(r'^(?P<q>["\'])(?P<content>.*)(?P=q)$', r'\g<content>', content, flags=re.DOTALL)),
+ whole
+ )
+
+
+class HTMLBreakOnClosingTagParser(compat_HTMLParser):
+ """
+ HTML parser which raises HTMLBreakOnClosingTagException upon reaching the
+ closing tag for the first opening tag it has encountered, and can be used
+ as a context manager
+ """
+
+ class HTMLBreakOnClosingTagException(Exception):
+ pass
+
+ def __init__(self):
+ self.tagstack = collections.deque()
+ compat_HTMLParser.__init__(self)
- return retlist
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *_):
+ self.close()
+
+ def close(self):
+ # handle_endtag does not return upon raising HTMLBreakOnClosingTagException,
+ # so data remains buffered; we no longer have any interest in it, thus
+ # override this method to discard it
+ pass
+
+ def handle_starttag(self, tag, _):
+ self.tagstack.append(tag)
+
+ def handle_endtag(self, tag):
+ if not self.tagstack:
+ raise compat_HTMLParseError('no tags in the stack')
+ while self.tagstack:
+ inner_tag = self.tagstack.pop()
+ if inner_tag == tag:
+ break
+ else:
+ raise compat_HTMLParseError(f'matching opening tag for closing {tag} tag not found')
+ if not self.tagstack:
+ raise self.HTMLBreakOnClosingTagException()
+
+
+def get_element_text_and_html_by_tag(tag, html):
+ """
+ For the first element with the specified tag in the passed HTML document
+ return its' content (text) and the whole element (html)
+ """
+ def find_or_raise(haystack, needle, exc):
+ try:
+ return haystack.index(needle)
+ except ValueError:
+ raise exc
+ closing_tag = f'</{tag}>'
+ whole_start = find_or_raise(
+ html, f'<{tag}', compat_HTMLParseError(f'opening {tag} tag not found'))
+ content_start = find_or_raise(
+ html[whole_start:], '>', compat_HTMLParseError(f'malformed opening {tag} tag'))
+ content_start += whole_start + 1
+ with HTMLBreakOnClosingTagParser() as parser:
+ parser.feed(html[whole_start:content_start])
+ if not parser.tagstack or parser.tagstack[0] != tag:
+ raise compat_HTMLParseError(f'parser did not match opening {tag} tag')
+ offset = content_start
+ while offset < len(html):
+ next_closing_tag_start = find_or_raise(
+ html[offset:], closing_tag,
+ compat_HTMLParseError(f'closing {tag} tag not found'))
+ next_closing_tag_end = next_closing_tag_start + len(closing_tag)
+ try:
+ parser.feed(html[offset:offset + next_closing_tag_end])
+ offset += next_closing_tag_end
+ except HTMLBreakOnClosingTagParser.HTMLBreakOnClosingTagException:
+ return html[content_start:offset + next_closing_tag_start], \
+ html[whole_start:offset + next_closing_tag_end]
+ raise compat_HTMLParseError('unexpected end of html')
class HTMLAttributeParser(compat_HTMLParser):
if html is None: # Convenience for sanitizing descriptions etc.
return html
- # Newline vs <br />
- html = html.replace('\n', ' ')
- html = re.sub(r'(?u)\s*<\s*br\s*/?\s*>\s*', '\n', html)
- html = re.sub(r'(?u)<\s*/\s*p\s*>\s*<\s*p[^>]*>', '\n', html)
+ html = re.sub(r'\s+', ' ', html)
+ html = re.sub(r'(?u)\s?<\s?br\s?/?\s?>\s?', '\n', html)
+ html = re.sub(r'(?u)<\s?/\s?p\s?>\s?<\s?p[^>]*>', '\n', html)
# Strip html tags
html = re.sub('<.*?>', '', html)
# Replace html entities
import msvcrt
msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
return (sys.stdout.buffer if hasattr(sys.stdout, 'buffer') else sys.stdout, filename)
- stream = open(encodeFilename(filename), open_mode)
+ stream = locked_file(filename, open_mode, block=False).open()
return (stream, filename)
except (IOError, OSError) as err:
if err.errno in (errno.EACCES,):
raise
else:
# An exception here should be caught in the caller
- stream = open(encodeFilename(alt_filename), open_mode)
+ stream = locked_file(filename, open_mode, block=False).open()
return (stream, alt_filename)
opts_check_certificate = not params.get('nocheckcertificate')
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.check_hostname = opts_check_certificate
+ if params.get('legacyserverconnect'):
+ context.options |= 4 # SSL_OP_LEGACY_SERVER_CONNECT
context.verify_mode = ssl.CERT_REQUIRED if opts_check_certificate else ssl.CERT_NONE
if opts_check_certificate:
try:
def bug_reports_message(before=';'):
- if ytdl_is_updateable():
- update_cmd = 'type yt-dlp -U to update'
- else:
- update_cmd = 'see https://github.com/yt-dlp/yt-dlp on how to update'
- msg = 'please report this issue on https://github.com/yt-dlp/yt-dlp .'
- msg += ' Make sure you are using the latest version; %s.' % update_cmd
- msg += ' Be sure to call yt-dlp with the --verbose flag and include its complete output.'
+ msg = ('please report this issue on https://github.com/yt-dlp/yt-dlp , '
+ 'filling out the appropriate issue template. '
+ 'Confirm you are on the latest version using yt-dlp -U')
before = before.rstrip()
if not before or before.endswith(('.', '!', '?')):
if sys.exc_info()[0] in network_exceptions:
expected = True
- self.msg = str(msg)
+ self.orig_msg = str(msg)
self.traceback = tb
self.expected = expected
self.cause = cause
super(ExtractorError, self).__init__(''.join((
format_field(ie, template='[%s] '),
format_field(video_id, template='%s: '),
- self.msg,
+ msg,
format_field(cause, template=' (caused by %r)'),
'' if expected else bug_reports_message())))
def format_traceback(self):
- if self.traceback is None:
- return None
- return ''.join(traceback.format_tb(self.traceback))
+ return join_nonempty(
+ self.traceback and ''.join(traceback.format_tb(self.traceback)),
+ self.cause and ''.join(traceback.format_exception(None, self.cause, self.cause.__traceback__)[1:]),
+ delim='\n') or None
class UnsupportedError(ExtractorError):
except zlib.error:
return zlib.decompress(data)
+ @staticmethod
+ def brotli(data):
+ if not data:
+ return data
+ return compat_brotli.decompress(data)
+
def http_request(self, req):
# According to RFC 3986, URLs can not contain non-ASCII characters, however this is not
# always respected by websites, some tend to give out URLs with non percent-encoded
if url != url_escaped:
req = update_Request(req, url=url_escaped)
- for h, v in std_headers.items():
+ for h, v in self._params.get('http_headers', std_headers).items():
# Capitalize is needed because of Python bug 2275: http://bugs.python.org/issue2275
# The dict keys are capitalized because of this bug by urllib
if h.capitalize() not in req.headers:
resp = compat_urllib_request.addinfourl(gz, old_resp.headers, old_resp.url, old_resp.code)
resp.msg = old_resp.msg
del resp.headers['Content-encoding']
+ # brotli
+ if resp.headers.get('Content-encoding', '') == 'br':
+ resp = compat_urllib_request.addinfourl(
+ io.BytesIO(self.brotli(resp.read())), old_resp.headers, old_resp.url, old_resp.code)
+ resp.msg = old_resp.msg
+ del resp.headers['Content-encoding']
# Percent-encode redirect URL of Location HTTP header to satisfy RFC 3986 (see
# https://github.com/ytdl-org/youtube-dl/issues/6457).
if 300 <= resp.code < 400:
def datetime_from_str(date_str, precision='auto', format='%Y%m%d'):
"""
Return a datetime object from a string in the format YYYYMMDD or
- (now|today|date)[+-][0-9](microsecond|second|minute|hour|day|week|month|year)(s)?
+ (now|today|yesterday|date)[+-][0-9](microsecond|second|minute|hour|day|week|month|year)(s)?
format: string date format used to return datetime object from
precision: round the time portion of a datetime object.
if precision == 'auto':
auto_precision = True
precision = 'microsecond'
- today = datetime_round(datetime.datetime.now(), precision)
+ today = datetime_round(datetime.datetime.utcnow(), precision)
if date_str in ('now', 'today'):
return today
if date_str == 'yesterday':
return datetime_round(datetime.datetime.strptime(date_str, format), precision)
-def date_from_str(date_str, format='%Y%m%d'):
+def date_from_str(date_str, format='%Y%m%d', strict=False):
"""
Return a datetime object from a string in the format YYYYMMDD or
- (now|today|date)[+-][0-9](microsecond|second|minute|hour|day|week|month|year)(s)?
+ (now|today|yesterday|date)[+-][0-9](microsecond|second|minute|hour|day|week|month|year)(s)?
+
+ If "strict", only (now|today)[+-][0-9](day|week|month|year)(s)? is allowed
format: string date format used to return datetime object from
"""
+ if strict and not re.fullmatch(r'\d{8}|(now|today)[+-]\d+(day|week|month|year)(s)?', date_str):
+ raise ValueError(f'Invalid date format {date_str}')
return datetime_from_str(date_str, precision='microsecond', format=format).date()
def __init__(self, start=None, end=None):
"""start and end must be strings in the format accepted by date"""
if start is not None:
- self.start = date_from_str(start)
+ self.start = date_from_str(start, strict=True)
else:
self.start = datetime.datetime.min.date()
if end is not None:
- self.end = date_from_str(end)
+ self.end = date_from_str(end, strict=True)
else:
self.end = datetime.datetime.max.date()
if self.start > self.end:
False if it has yet to be written out."""
# Adapted from http://stackoverflow.com/a/3259271/35070
- import ctypes
import ctypes.wintypes
WIN_OUTPUT_IDS = {
whole_low = 0xffffffff
whole_high = 0x7fffffff
- def _lock_file(f, exclusive):
+ def _lock_file(f, exclusive, block):
overlapped = OVERLAPPED()
overlapped.Offset = 0
overlapped.OffsetHigh = 0
overlapped.hEvent = 0
f._lock_file_overlapped_p = ctypes.pointer(overlapped)
- handle = msvcrt.get_osfhandle(f.fileno())
- if not LockFileEx(handle, 0x2 if exclusive else 0x0, 0,
- whole_low, whole_high, f._lock_file_overlapped_p):
- raise OSError('Locking file failed: %r' % ctypes.FormatError())
+
+ if not LockFileEx(msvcrt.get_osfhandle(f.fileno()),
+ (0x2 if exclusive else 0x0) | (0x0 if block else 0x1),
+ 0, whole_low, whole_high, f._lock_file_overlapped_p):
+ raise BlockingIOError('Locking file failed: %r' % ctypes.FormatError())
def _unlock_file(f):
assert f._lock_file_overlapped_p
handle = msvcrt.get_osfhandle(f.fileno())
- if not UnlockFileEx(handle, 0,
- whole_low, whole_high, f._lock_file_overlapped_p):
+ if not UnlockFileEx(handle, 0, whole_low, whole_high, f._lock_file_overlapped_p):
raise OSError('Unlocking file failed: %r' % ctypes.FormatError())
else:
- # Some platforms, such as Jython, is missing fcntl
try:
import fcntl
- def _lock_file(f, exclusive):
- fcntl.flock(f, fcntl.LOCK_EX if exclusive else fcntl.LOCK_SH)
+ def _lock_file(f, exclusive, block):
+ try:
+ fcntl.flock(f,
+ fcntl.LOCK_SH if not exclusive
+ else fcntl.LOCK_EX if block
+ else fcntl.LOCK_EX | fcntl.LOCK_NB)
+ except BlockingIOError:
+ raise
+ except OSError: # AOSP does not have flock()
+ fcntl.lockf(f,
+ fcntl.LOCK_SH if not exclusive
+ else fcntl.LOCK_EX if block
+ else fcntl.LOCK_EX | fcntl.LOCK_NB)
def _unlock_file(f):
- fcntl.flock(f, fcntl.LOCK_UN)
+ try:
+ fcntl.flock(f, fcntl.LOCK_UN)
+ except OSError:
+ fcntl.lockf(f, fcntl.LOCK_UN)
+
except ImportError:
UNSUPPORTED_MSG = 'file locking is not supported on this platform'
- def _lock_file(f, exclusive):
+ def _lock_file(f, exclusive, block):
raise IOError(UNSUPPORTED_MSG)
def _unlock_file(f):
class locked_file(object):
- def __init__(self, filename, mode, encoding=None):
- assert mode in ['r', 'a', 'w']
+ _closed = False
+
+ def __init__(self, filename, mode, block=True, encoding=None):
+ assert mode in ['r', 'rb', 'a', 'ab', 'w', 'wb']
self.f = io.open(filename, mode, encoding=encoding)
self.mode = mode
+ self.block = block
def __enter__(self):
- exclusive = self.mode != 'r'
+ exclusive = 'r' not in self.mode
try:
- _lock_file(self.f, exclusive)
+ _lock_file(self.f, exclusive, self.block)
except IOError:
self.f.close()
raise
def __exit__(self, etype, value, traceback):
try:
- _unlock_file(self.f)
+ if not self._closed:
+ _unlock_file(self.f)
finally:
self.f.close()
+ self._closed = True
def __iter__(self):
return iter(self.f)
def read(self, *args):
return self.f.read(*args)
+ def flush(self):
+ self.f.flush()
+
+ def open(self):
+ return self.__enter__()
+
+ def close(self, *args):
+ self.__exit__(self, *args, value=False, traceback=False)
+
def get_filesystem_encoding():
encoding = sys.getfilesystemencoding()
return url, data
+def format_decimal_suffix(num, fmt='%d%s', *, factor=1000):
+ """ Formats numbers with decimal sufixes like K, M, etc """
+ num, factor = float_or_none(num), float(factor)
+ if num is None or num < 0:
+ return None
+ exponent = 0 if num == 0 else int(math.log(num, factor))
+ suffix = ['', *'kMGTPEZY'][exponent]
+ if factor == 1024:
+ suffix = {'k': 'Ki', '': ''}.get(suffix, f'{suffix}i')
+ converted = num / (factor ** exponent)
+ return fmt % (converted, suffix)
+
+
def format_bytes(bytes):
- if bytes is None:
- return 'N/A'
- if type(bytes) is str:
- bytes = float(bytes)
- if bytes == 0.0:
- exponent = 0
- else:
- exponent = int(math.log(bytes, 1024.0))
- suffix = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB'][exponent]
- converted = float(bytes) / float(1024 ** exponent)
- return '%.2f%s' % (converted, suffix)
+ return format_decimal_suffix(bytes, '%.2f%sB', factor=1024) or 'N/A'
def lookup_unit_table(unit_table, s):
if s is None:
return None
- s = s.strip()
+ s = re.sub(r'^[^\d]+\s', '', s).strip()
if re.match(r'^[\d,.]+$', s):
return str_to_int(s)
'M': 1000 ** 2,
'kk': 1000 ** 2,
'KK': 1000 ** 2,
+ 'b': 1000 ** 3,
+ 'B': 1000 ** 3,
}
- return lookup_unit_table(_UNIT_TABLE, s)
+ ret = lookup_unit_table(_UNIT_TABLE, s)
+ if ret is not None:
+ return ret
+
+ mobj = re.match(r'([\d,.]+)(?:$|\s)', s)
+ if mobj:
+ return str_to_int(mobj.group(1))
def parse_resolution(s):
def int_or_none(v, scale=1, default=None, get_attr=None, invscale=1):
- if get_attr:
- if v is not None:
- v = getattr(v, get_attr, None)
- if v == '':
- v = None
- if v is None:
- return default
+ if get_attr and v is not None:
+ v = getattr(v, get_attr, None)
try:
return int(v) * invscale // scale
except (ValueError, TypeError, OverflowError):
return url if re.match(r'^(?:(?:https?|rt(?:m(?:pt?[es]?|fp)|sp[su]?)|mms|ftps?):)?//', url) else None
+def request_to_url(req):
+ if isinstance(req, compat_urllib_request.Request):
+ return req.get_full_url()
+ else:
+ return req
+
+
def strftime_or_none(timestamp, date_format, default=None):
datetime_object = None
try:
return None
days, hours, mins, secs, ms = [None] * 5
- m = re.match(r'(?:(?:(?:(?P<days>[0-9]+):)?(?P<hours>[0-9]+):)?(?P<mins>[0-9]+):)?(?P<secs>[0-9]+)(?P<ms>\.[0-9]+)?Z?$', s)
+ m = re.match(r'''(?x)
+ (?P<before_secs>
+ (?:(?:(?P<days>[0-9]+):)?(?P<hours>[0-9]+):)?(?P<mins>[0-9]+):)?
+ (?P<secs>(?(before_secs)[0-9]{1,2}|[0-9]+))
+ (?P<ms>[.:][0-9]+)?Z?$
+ ''', s)
if m:
- days, hours, mins, secs, ms = m.groups()
+ days, hours, mins, secs, ms = m.group('days', 'hours', 'mins', 'secs', 'ms')
else:
m = re.match(
r'''(?ix)(?:P?
if days:
duration += float(days) * 24 * 60 * 60
if ms:
- duration += float(ms)
+ duration += float(ms.replace(':', '.'))
return duration
def __init__(self, pagefunc, pagesize, use_cache=True):
self._pagefunc = pagefunc
self._pagesize = pagesize
+ self._pagecount = float('inf')
self._use_cache = use_cache
self._cache = {}
def getpage(self, pagenum):
page_results = self._cache.get(pagenum)
if page_results is None:
- page_results = list(self._pagefunc(pagenum))
+ page_results = [] if pagenum > self._pagecount else list(self._pagefunc(pagenum))
if self._use_cache:
self._cache[pagenum] = page_results
return page_results
raise NotImplementedError('This method must be implemented by subclasses')
def __getitem__(self, idx):
- # NOTE: cache must be enabled if this is used
+ assert self._use_cache, 'Indexing PagedList requires cache'
if not isinstance(idx, int) or idx < 0:
raise TypeError('indices must be non-negative integers')
entries = self.getslice(idx, idx + 1)
if (end is not None and firstid <= end <= nextfirstid)
else None)
- page_results = self.getpage(pagenum)
+ try:
+ page_results = self.getpage(pagenum)
+ except Exception:
+ self._pagecount = pagenum - 1
+ raise
if startv != 0 or endv is not None:
page_results = page_results[startv:endv]
yield from page_results
class InAdvancePagedList(PagedList):
def __init__(self, pagefunc, pagecount, pagesize):
- self._pagecount = pagecount
PagedList.__init__(self, pagefunc, pagesize, True)
+ self._pagecount = pagecount
def _getslice(self, start, end):
start_page = start // self._pagesize
- end_page = (
- self._pagecount if end is None else (end // self._pagesize + 1))
+ end_page = self._pagecount if end is None else min(self._pagecount, end // self._pagesize + 1)
skip_elems = start - start_page * self._pagesize
only_more = None if end is None else end - start
for pagenum in range(start_page, end_page):
return '"%s"' % v
+ code = re.sub(r'new Date\((".+")\)', r'\g<1>', code)
+
return re.sub(r'''(?sx)
"(?:[^"\\]*(?:\\\\|\\['"nurtbfx/\n]))*[^"\\]*"|
'(?:[^'\\]*(?:\\\\|\\['"nurtbfx/\n]))*[^'\\]*'|
return q
+POSTPROCESS_WHEN = {'pre_process', 'after_filter', 'before_dl', 'after_move', 'post_process', 'after_video', 'playlist'}
+
+
DEFAULT_OUTTMPL = {
'default': '%(title)s [%(id)s].%(ext)s',
'chapter': '%(title)s - %(section_number)03d %(section_title)s [%(id)s].%(ext)s',
'annotation': 'annotations.xml',
'infojson': 'info.json',
'link': None,
+ 'pl_video': None,
'pl_thumbnail': None,
'pl_description': 'description',
'pl_infojson': 'info.json',
return {}
split_codecs = list(filter(None, map(
str.strip, codecs_str.strip().strip(',').split(','))))
- vcodec, acodec, hdr = None, None, None
+ vcodec, acodec, tcodec, hdr = None, None, None, None
for full_codec in split_codecs:
parts = full_codec.split('.')
codec = parts[0].replace('0', '')
if codec in ('avc1', 'avc2', 'avc3', 'avc4', 'vp9', 'vp8', 'hev1', 'hev2',
'h263', 'h264', 'mp4v', 'hvc1', 'av1', 'theora', 'dvh1', 'dvhe'):
if not vcodec:
- vcodec = '.'.join(parts[:4]) if codec in ('vp9', 'av1') else full_codec
+ vcodec = '.'.join(parts[:4]) if codec in ('vp9', 'av1', 'hvc1') else full_codec
if codec in ('dvh1', 'dvhe'):
hdr = 'DV'
elif codec == 'av1' and len(parts) > 3 and parts[3] == '10':
hdr = 'HDR10'
elif full_codec.replace('0', '').startswith('vp9.2'):
hdr = 'HDR10'
- elif codec in ('mp4a', 'opus', 'vorbis', 'mp3', 'aac', 'ac-3', 'ec-3', 'eac3', 'dtsc', 'dtse', 'dtsh', 'dtsl'):
+ elif codec in ('flac', 'mp4a', 'opus', 'vorbis', 'mp3', 'aac', 'ac-3', 'ec-3', 'eac3', 'dtsc', 'dtse', 'dtsh', 'dtsl'):
if not acodec:
acodec = full_codec
+ elif codec in ('stpp', 'wvtt',):
+ if not tcodec:
+ tcodec = full_codec
else:
write_string('WARNING: Unknown codec %s\n' % full_codec, sys.stderr)
- if not vcodec and not acodec:
- if len(split_codecs) == 2:
- return {
- 'vcodec': split_codecs[0],
- 'acodec': split_codecs[1],
- }
- else:
+ if vcodec or acodec or tcodec:
return {
'vcodec': vcodec or 'none',
'acodec': acodec or 'none',
'dynamic_range': hdr,
+ **({'tcodec': tcodec} if tcodec is not None else {}),
+ }
+ elif len(split_codecs) == 2:
+ return {
+ 'vcodec': split_codecs[0],
+ 'acodec': split_codecs[1],
}
return {}
return [max(width(str(v)) for v in col) for col in zip(*table)]
def filter_using_list(row, filterArray):
- return [col for (take, col) in zip(filterArray, row) if take]
+ return [col for take, col in itertools.zip_longest(filterArray, row, fillvalue=True) if take]
- if hide_empty:
- max_lens = get_max_lens(data)
- header_row = filter_using_list(header_row, max_lens)
- data = [filter_using_list(row, max_lens) for row in data]
+ max_lens = get_max_lens(data) if hide_empty else []
+ header_row = filter_using_list(header_row, max_lens)
+ data = [filter_using_list(row, max_lens) for row in data]
table = [header_row] + data
max_lens = get_max_lens(table)
extra_gap += 1
if delim:
table = [header_row, [delim * (ml + extra_gap) for ml in max_lens]] + data
- table[1][-1] = table[1][-1][:-extra_gap] # Remove extra_gap from end of delimiter
+ table[1][-1] = table[1][-1][:-extra_gap * len(delim)] # Remove extra_gap from end of delimiter
for row in table:
for pos, text in enumerate(map(str, row)):
if '\t' in text:
def match_filter_func(filter_str):
+ if filter_str is None:
+ return None
+
def _match_func(info_dict, *args, **kwargs):
if match_str(filter_str, info_dict, *args, **kwargs):
return None
def format_field(obj, field=None, template='%s', ignore=(None, ''), default='', func=None):
- if field is None:
- val = obj if obj is not None else default
- else:
- val = obj.get(field, default)
- if func and val not in ignore:
- val = func(val)
- return template % val if val not in ignore else default
+ val = traverse_obj(obj, *variadic(field))
+ if val in ignore:
+ return default
+ return template % (func(val) if func else val)
def clean_podcast_url(url):
''' Traverse nested list/dict/tuple
@param path_list A list of paths which are checked one by one.
Each path is a list of keys where each key is a string,
- a function, a tuple of strings or "...".
+ a function, a tuple of strings/None or "...".
When a fuction is given, it takes the key as argument and
returns whether the key matches or not. When a tuple is given,
all the keys given in the tuple are traversed, and
"..." traverses all the keys in the object
+ "None" returns the object without traversal
@param default Default value to return
@param expected_type Only accept final value of this type (Can also be any callable)
@param get_all Return all the values obtained from a path or only the first one
nonlocal depth
path = tuple(variadic(path))
for i, key in enumerate(path):
- if obj is None:
- return None
+ if None in (key, obj):
+ return obj
if isinstance(key, (list, tuple)):
obj = [_traverse_obj(obj, sub_key, _current_depth) for sub_key in key]
key = ...
return default
-# Deprecated
def traverse_dict(dictn, keys, casesense=True):
write_string('DeprecationWarning: yt_dlp.utils.traverse_dict is deprecated '
'and may be removed in a future version. Use yt_dlp.utils.traverse_obj instead')
return traverse_obj(dictn, keys, casesense=casesense, is_user_input=True, traverse_string=True)
-def variadic(x, allowed_types=(str, bytes)):
+def get_first(obj, keys, **kwargs):
+ return traverse_obj(obj, (..., *variadic(keys)), **kwargs, get_all=False)
+
+
+def variadic(x, allowed_types=(str, bytes, dict)):
return x if isinstance(x, collections.abc.Iterable) and not isinstance(x, allowed_types) else (x,)
+def decode_base(value, digits):
+ # This will convert given base-x string to scalar (long or int)
+ table = {char: index for index, char in enumerate(digits)}
+ result = 0
+ base = len(digits)
+ for chr in value:
+ result *= base
+ result += table[chr]
+ return result
+
+
+def time_seconds(**kwargs):
+ t = datetime.datetime.now(datetime.timezone(datetime.timedelta(**kwargs)))
+ return t.timestamp()
+
+
# create a JSON Web Signature (jws) with HS256 algorithm
# the resulting format is in JWS Compact Serialization
# implemented following JWT https://www.rfc-editor.org/rfc/rfc7519.html
if from_dict is not None:
values = map(from_dict.get, values)
return delim.join(map(str, filter(None, values)))
+
+
+def scale_thumbnails_to_max_format_width(formats, thumbnails, url_width_re):
+ """
+ Find the largest format dimensions in terms of video width and, for each thumbnail:
+ * Modify the URL: Match the width with the provided regex and replace with the former width
+ * Update dimensions
+
+ This function is useful with video services that scale the provided thumbnails on demand
+ """
+ _keys = ('width', 'height')
+ max_dimensions = max(
+ [tuple(format.get(k) or 0 for k in _keys) for format in formats],
+ default=(0, 0))
+ if not max_dimensions[0]:
+ return thumbnails
+ return [
+ merge_dicts(
+ {'url': re.sub(url_width_re, str(max_dimensions[0]), thumbnail['url'])},
+ dict(zip(_keys, max_dimensions)), thumbnail)
+ for thumbnail in thumbnails
+ ]
+
+
+def parse_http_range(range):
+ """ Parse value of "Range" or "Content-Range" HTTP header into tuple. """
+ if not range:
+ return None, None, None
+ crg = re.search(r'bytes[ =](\d+)-(\d+)?(?:/(\d+))?', range)
+ if not crg:
+ return None, None, None
+ return int(crg.group(1)), int_or_none(crg.group(2)), int_or_none(crg.group(3))
+
+
+class Config:
+ own_args = None
+ filename = None
+ __initialized = False
+
+ def __init__(self, parser, label=None):
+ self._parser, self.label = parser, label
+ self._loaded_paths, self.configs = set(), []
+
+ def init(self, args=None, filename=None):
+ assert not self.__initialized
+ directory = ''
+ if filename:
+ location = os.path.realpath(filename)
+ directory = os.path.dirname(location)
+ if location in self._loaded_paths:
+ return False
+ self._loaded_paths.add(location)
+
+ self.__initialized = True
+ self.own_args, self.filename = args, filename
+ for location in self._parser.parse_args(args)[0].config_locations or []:
+ location = os.path.join(directory, expand_path(location))
+ if os.path.isdir(location):
+ location = os.path.join(location, 'yt-dlp.conf')
+ if not os.path.exists(location):
+ self._parser.error(f'config location {location} does not exist')
+ self.append_config(self.read_file(location), location)
+ return True
+
+ def __str__(self):
+ label = join_nonempty(
+ self.label, 'config', f'"{self.filename}"' if self.filename else '',
+ delim=' ')
+ return join_nonempty(
+ self.own_args is not None and f'{label[0].upper()}{label[1:]}: {self.hide_login_info(self.own_args)}',
+ *(f'\n{c}'.replace('\n', '\n| ')[1:] for c in self.configs),
+ delim='\n')
+
+ @staticmethod
+ def read_file(filename, default=[]):
+ try:
+ optionf = open(filename)
+ except IOError:
+ return default # silently skip if file is not present
+ try:
+ # FIXME: https://github.com/ytdl-org/youtube-dl/commit/dfe5fa49aed02cf36ba9f743b11b0903554b5e56
+ contents = optionf.read()
+ if sys.version_info < (3,):
+ contents = contents.decode(preferredencoding())
+ res = compat_shlex_split(contents, comments=True)
+ finally:
+ optionf.close()
+ return res
+
+ @staticmethod
+ def hide_login_info(opts):
+ PRIVATE_OPTS = set(['-p', '--password', '-u', '--username', '--video-password', '--ap-password', '--ap-username'])
+ eqre = re.compile('^(?P<key>' + ('|'.join(re.escape(po) for po in PRIVATE_OPTS)) + ')=.+$')
+
+ def _scrub_eq(o):
+ m = eqre.match(o)
+ if m:
+ return m.group('key') + '=PRIVATE'
+ else:
+ return o
+
+ opts = list(map(_scrub_eq, opts))
+ for idx, opt in enumerate(opts):
+ if opt in PRIVATE_OPTS and idx + 1 < len(opts):
+ opts[idx + 1] = 'PRIVATE'
+ return opts
+
+ def append_config(self, *args, label=None):
+ config = type(self)(self._parser, label)
+ config._loaded_paths = self._loaded_paths
+ if config.init(*args):
+ self.configs.append(config)
+
+ @property
+ def all_args(self):
+ for config in reversed(self.configs):
+ yield from config.all_args
+ yield from self.own_args or []
+
+ def parse_args(self):
+ return self._parser.parse_args(list(self.all_args))
+
+
+class WebSocketsWrapper():
+ """Wraps websockets module to use in non-async scopes"""
+
+ def __init__(self, url, headers=None):
+ self.loop = asyncio.events.new_event_loop()
+ self.conn = compat_websockets.connect(
+ url, extra_headers=headers, ping_interval=None,
+ close_timeout=float('inf'), loop=self.loop, ping_timeout=float('inf'))
+ atexit.register(self.__exit__, None, None, None)
+
+ def __enter__(self):
+ self.pool = self.run_with_loop(self.conn.__aenter__(), self.loop)
+ return self
+
+ def send(self, *args):
+ self.run_with_loop(self.pool.send(*args), self.loop)
+
+ def recv(self, *args):
+ return self.run_with_loop(self.pool.recv(*args), self.loop)
+
+ def __exit__(self, type, value, traceback):
+ try:
+ return self.run_with_loop(self.conn.__aexit__(type, value, traceback), self.loop)
+ finally:
+ self.loop.close()
+ self._cancel_all_tasks(self.loop)
+
+ # taken from https://github.com/python/cpython/blob/3.9/Lib/asyncio/runners.py with modifications
+ # for contributors: If there's any new library using asyncio needs to be run in non-async, move these function out of this class
+ @staticmethod
+ def run_with_loop(main, loop):
+ if not asyncio.coroutines.iscoroutine(main):
+ raise ValueError(f'a coroutine was expected, got {main!r}')
+
+ try:
+ return loop.run_until_complete(main)
+ finally:
+ loop.run_until_complete(loop.shutdown_asyncgens())
+ if hasattr(loop, 'shutdown_default_executor'):
+ loop.run_until_complete(loop.shutdown_default_executor())
+
+ @staticmethod
+ def _cancel_all_tasks(loop):
+ to_cancel = asyncio.tasks.all_tasks(loop)
+
+ if not to_cancel:
+ return
+
+ for task in to_cancel:
+ task.cancel()
+
+ loop.run_until_complete(
+ asyncio.tasks.gather(*to_cancel, loop=loop, return_exceptions=True))
+
+ for task in to_cancel:
+ if task.cancelled():
+ continue
+ if task.exception() is not None:
+ loop.call_exception_handler({
+ 'message': 'unhandled exception during asyncio.run() shutdown',
+ 'exception': task.exception(),
+ 'task': task,
+ })
+
+
+has_websockets = bool(compat_websockets)
+
+
+def merge_headers(*dicts):
+ """Merge dicts of http headers case insensitively, prioritizing the latter ones"""
+ return {k.title(): v for k, v in itertools.chain.from_iterable(map(dict.items, dicts))}