]> jfr.im git - yt-dlp.git/blob - yt_dlp/utils/_utils.py
[rh:websockets] Migrate websockets to networking framework (#7720)
[yt-dlp.git] / yt_dlp / utils / _utils.py
1 import base64
2 import binascii
3 import calendar
4 import codecs
5 import collections
6 import collections.abc
7 import contextlib
8 import datetime
9 import email.header
10 import email.utils
11 import errno
12 import hashlib
13 import hmac
14 import html.entities
15 import html.parser
16 import inspect
17 import io
18 import itertools
19 import json
20 import locale
21 import math
22 import mimetypes
23 import netrc
24 import operator
25 import os
26 import platform
27 import random
28 import re
29 import shlex
30 import socket
31 import ssl
32 import struct
33 import subprocess
34 import sys
35 import tempfile
36 import time
37 import traceback
38 import types
39 import unicodedata
40 import urllib.error
41 import urllib.parse
42 import urllib.request
43 import xml.etree.ElementTree
44
45 from . import traversal
46
47 from ..compat import functools # isort: split
48 from ..compat import (
49 compat_etree_fromstring,
50 compat_expanduser,
51 compat_HTMLParseError,
52 compat_os_name,
53 compat_shlex_quote,
54 )
55 from ..dependencies import xattr
56
57 __name__ = __name__.rsplit('.', 1)[0] # Pretend to be the parent module
58
59 # This is not clearly defined otherwise
60 compiled_regex_type = type(re.compile(''))
61
62
63 class NO_DEFAULT:
64 pass
65
66
67 def IDENTITY(x):
68 return x
69
70
71 ENGLISH_MONTH_NAMES = [
72 'January', 'February', 'March', 'April', 'May', 'June',
73 'July', 'August', 'September', 'October', 'November', 'December']
74
75 MONTH_NAMES = {
76 'en': ENGLISH_MONTH_NAMES,
77 'fr': [
78 'janvier', 'février', 'mars', 'avril', 'mai', 'juin',
79 'juillet', 'août', 'septembre', 'octobre', 'novembre', 'décembre'],
80 # these follow the genitive grammatical case (dopełniacz)
81 # some websites might be using nominative, which will require another month list
82 # https://en.wikibooks.org/wiki/Polish/Noun_cases
83 'pl': ['stycznia', 'lutego', 'marca', 'kwietnia', 'maja', 'czerwca',
84 'lipca', 'sierpnia', 'września', 'października', 'listopada', 'grudnia'],
85 }
86
87 # From https://github.com/python/cpython/blob/3.11/Lib/email/_parseaddr.py#L36-L42
88 TIMEZONE_NAMES = {
89 'UT': 0, 'UTC': 0, 'GMT': 0, 'Z': 0,
90 'AST': -4, 'ADT': -3, # Atlantic (used in Canada)
91 'EST': -5, 'EDT': -4, # Eastern
92 'CST': -6, 'CDT': -5, # Central
93 'MST': -7, 'MDT': -6, # Mountain
94 'PST': -8, 'PDT': -7 # Pacific
95 }
96
97 # needed for sanitizing filenames in restricted mode
98 ACCENT_CHARS = dict(zip('ÂÃÄÀÁÅÆÇÈÉÊËÌÍÎÏÐÑÒÓÔÕÖŐØŒÙÚÛÜŰÝÞßàáâãäåæçèéêëìíîïðñòóôõöőøœùúûüűýþÿ',
99 itertools.chain('AAAAAA', ['AE'], 'CEEEEIIIIDNOOOOOOO', ['OE'], 'UUUUUY', ['TH', 'ss'],
100 'aaaaaa', ['ae'], 'ceeeeiiiionooooooo', ['oe'], 'uuuuuy', ['th'], 'y')))
101
102 DATE_FORMATS = (
103 '%d %B %Y',
104 '%d %b %Y',
105 '%B %d %Y',
106 '%B %dst %Y',
107 '%B %dnd %Y',
108 '%B %drd %Y',
109 '%B %dth %Y',
110 '%b %d %Y',
111 '%b %dst %Y',
112 '%b %dnd %Y',
113 '%b %drd %Y',
114 '%b %dth %Y',
115 '%b %dst %Y %I:%M',
116 '%b %dnd %Y %I:%M',
117 '%b %drd %Y %I:%M',
118 '%b %dth %Y %I:%M',
119 '%Y %m %d',
120 '%Y-%m-%d',
121 '%Y.%m.%d.',
122 '%Y/%m/%d',
123 '%Y/%m/%d %H:%M',
124 '%Y/%m/%d %H:%M:%S',
125 '%Y%m%d%H%M',
126 '%Y%m%d%H%M%S',
127 '%Y%m%d',
128 '%Y-%m-%d %H:%M',
129 '%Y-%m-%d %H:%M:%S',
130 '%Y-%m-%d %H:%M:%S.%f',
131 '%Y-%m-%d %H:%M:%S:%f',
132 '%d.%m.%Y %H:%M',
133 '%d.%m.%Y %H.%M',
134 '%Y-%m-%dT%H:%M:%SZ',
135 '%Y-%m-%dT%H:%M:%S.%fZ',
136 '%Y-%m-%dT%H:%M:%S.%f0Z',
137 '%Y-%m-%dT%H:%M:%S',
138 '%Y-%m-%dT%H:%M:%S.%f',
139 '%Y-%m-%dT%H:%M',
140 '%b %d %Y at %H:%M',
141 '%b %d %Y at %H:%M:%S',
142 '%B %d %Y at %H:%M',
143 '%B %d %Y at %H:%M:%S',
144 '%H:%M %d-%b-%Y',
145 )
146
147 DATE_FORMATS_DAY_FIRST = list(DATE_FORMATS)
148 DATE_FORMATS_DAY_FIRST.extend([
149 '%d-%m-%Y',
150 '%d.%m.%Y',
151 '%d.%m.%y',
152 '%d/%m/%Y',
153 '%d/%m/%y',
154 '%d/%m/%Y %H:%M:%S',
155 '%d-%m-%Y %H:%M',
156 '%H:%M %d/%m/%Y',
157 ])
158
159 DATE_FORMATS_MONTH_FIRST = list(DATE_FORMATS)
160 DATE_FORMATS_MONTH_FIRST.extend([
161 '%m-%d-%Y',
162 '%m.%d.%Y',
163 '%m/%d/%Y',
164 '%m/%d/%y',
165 '%m/%d/%Y %H:%M:%S',
166 ])
167
168 PACKED_CODES_RE = r"}\('(.+)',(\d+),(\d+),'([^']+)'\.split\('\|'\)"
169 JSON_LD_RE = r'(?is)<script[^>]+type=(["\']?)application/ld\+json\1[^>]*>\s*(?P<json_ld>{.+?}|\[.+?\])\s*</script>'
170
171 NUMBER_RE = r'\d+(?:\.\d+)?'
172
173
174 @functools.cache
175 def preferredencoding():
176 """Get preferred encoding.
177
178 Returns the best encoding scheme for the system, based on
179 locale.getpreferredencoding() and some further tweaks.
180 """
181 try:
182 pref = locale.getpreferredencoding()
183 'TEST'.encode(pref)
184 except Exception:
185 pref = 'UTF-8'
186
187 return pref
188
189
190 def write_json_file(obj, fn):
191 """ Encode obj as JSON and write it to fn, atomically if possible """
192
193 tf = tempfile.NamedTemporaryFile(
194 prefix=f'{os.path.basename(fn)}.', dir=os.path.dirname(fn),
195 suffix='.tmp', delete=False, mode='w', encoding='utf-8')
196
197 try:
198 with tf:
199 json.dump(obj, tf, ensure_ascii=False)
200 if sys.platform == 'win32':
201 # Need to remove existing file on Windows, else os.rename raises
202 # WindowsError or FileExistsError.
203 with contextlib.suppress(OSError):
204 os.unlink(fn)
205 with contextlib.suppress(OSError):
206 mask = os.umask(0)
207 os.umask(mask)
208 os.chmod(tf.name, 0o666 & ~mask)
209 os.rename(tf.name, fn)
210 except Exception:
211 with contextlib.suppress(OSError):
212 os.remove(tf.name)
213 raise
214
215
216 def find_xpath_attr(node, xpath, key, val=None):
217 """ Find the xpath xpath[@key=val] """
218 assert re.match(r'^[a-zA-Z_-]+$', key)
219 expr = xpath + ('[@%s]' % key if val is None else f"[@{key}='{val}']")
220 return node.find(expr)
221
222 # On python2.6 the xml.etree.ElementTree.Element methods don't support
223 # the namespace parameter
224
225
226 def xpath_with_ns(path, ns_map):
227 components = [c.split(':') for c in path.split('/')]
228 replaced = []
229 for c in components:
230 if len(c) == 1:
231 replaced.append(c[0])
232 else:
233 ns, tag = c
234 replaced.append('{%s}%s' % (ns_map[ns], tag))
235 return '/'.join(replaced)
236
237
238 def xpath_element(node, xpath, name=None, fatal=False, default=NO_DEFAULT):
239 def _find_xpath(xpath):
240 return node.find(xpath)
241
242 if isinstance(xpath, str):
243 n = _find_xpath(xpath)
244 else:
245 for xp in xpath:
246 n = _find_xpath(xp)
247 if n is not None:
248 break
249
250 if n is None:
251 if default is not NO_DEFAULT:
252 return default
253 elif fatal:
254 name = xpath if name is None else name
255 raise ExtractorError('Could not find XML element %s' % name)
256 else:
257 return None
258 return n
259
260
261 def xpath_text(node, xpath, name=None, fatal=False, default=NO_DEFAULT):
262 n = xpath_element(node, xpath, name, fatal=fatal, default=default)
263 if n is None or n == default:
264 return n
265 if n.text is None:
266 if default is not NO_DEFAULT:
267 return default
268 elif fatal:
269 name = xpath if name is None else name
270 raise ExtractorError('Could not find XML element\'s text %s' % name)
271 else:
272 return None
273 return n.text
274
275
276 def xpath_attr(node, xpath, key, name=None, fatal=False, default=NO_DEFAULT):
277 n = find_xpath_attr(node, xpath, key)
278 if n is None:
279 if default is not NO_DEFAULT:
280 return default
281 elif fatal:
282 name = f'{xpath}[@{key}]' if name is None else name
283 raise ExtractorError('Could not find XML attribute %s' % name)
284 else:
285 return None
286 return n.attrib[key]
287
288
289 def get_element_by_id(id, html, **kwargs):
290 """Return the content of the tag with the specified ID in the passed HTML document"""
291 return get_element_by_attribute('id', id, html, **kwargs)
292
293
294 def get_element_html_by_id(id, html, **kwargs):
295 """Return the html of the tag with the specified ID in the passed HTML document"""
296 return get_element_html_by_attribute('id', id, html, **kwargs)
297
298
299 def get_element_by_class(class_name, html):
300 """Return the content of the first tag with the specified class in the passed HTML document"""
301 retval = get_elements_by_class(class_name, html)
302 return retval[0] if retval else None
303
304
305 def get_element_html_by_class(class_name, html):
306 """Return the html of the first tag with the specified class in the passed HTML document"""
307 retval = get_elements_html_by_class(class_name, html)
308 return retval[0] if retval else None
309
310
311 def get_element_by_attribute(attribute, value, html, **kwargs):
312 retval = get_elements_by_attribute(attribute, value, html, **kwargs)
313 return retval[0] if retval else None
314
315
316 def get_element_html_by_attribute(attribute, value, html, **kargs):
317 retval = get_elements_html_by_attribute(attribute, value, html, **kargs)
318 return retval[0] if retval else None
319
320
321 def get_elements_by_class(class_name, html, **kargs):
322 """Return the content of all tags with the specified class in the passed HTML document as a list"""
323 return get_elements_by_attribute(
324 'class', r'[^\'"]*(?<=[\'"\s])%s(?=[\'"\s])[^\'"]*' % re.escape(class_name),
325 html, escape_value=False)
326
327
328 def get_elements_html_by_class(class_name, html):
329 """Return the html of all tags with the specified class in the passed HTML document as a list"""
330 return get_elements_html_by_attribute(
331 'class', r'[^\'"]*(?<=[\'"\s])%s(?=[\'"\s])[^\'"]*' % re.escape(class_name),
332 html, escape_value=False)
333
334
335 def get_elements_by_attribute(*args, **kwargs):
336 """Return the content of the tag with the specified attribute in the passed HTML document"""
337 return [content for content, _ in get_elements_text_and_html_by_attribute(*args, **kwargs)]
338
339
340 def get_elements_html_by_attribute(*args, **kwargs):
341 """Return the html of the tag with the specified attribute in the passed HTML document"""
342 return [whole for _, whole in get_elements_text_and_html_by_attribute(*args, **kwargs)]
343
344
345 def get_elements_text_and_html_by_attribute(attribute, value, html, *, tag=r'[\w:.-]+', escape_value=True):
346 """
347 Return the text (content) and the html (whole) of the tag with the specified
348 attribute in the passed HTML document
349 """
350 if not value:
351 return
352
353 quote = '' if re.match(r'''[\s"'`=<>]''', value) else '?'
354
355 value = re.escape(value) if escape_value else value
356
357 partial_element_re = rf'''(?x)
358 <(?P<tag>{tag})
359 (?:\s(?:[^>"']|"[^"]*"|'[^']*')*)?
360 \s{re.escape(attribute)}\s*=\s*(?P<_q>['"]{quote})(?-x:{value})(?P=_q)
361 '''
362
363 for m in re.finditer(partial_element_re, html):
364 content, whole = get_element_text_and_html_by_tag(m.group('tag'), html[m.start():])
365
366 yield (
367 unescapeHTML(re.sub(r'^(?P<q>["\'])(?P<content>.*)(?P=q)$', r'\g<content>', content, flags=re.DOTALL)),
368 whole
369 )
370
371
372 class HTMLBreakOnClosingTagParser(html.parser.HTMLParser):
373 """
374 HTML parser which raises HTMLBreakOnClosingTagException upon reaching the
375 closing tag for the first opening tag it has encountered, and can be used
376 as a context manager
377 """
378
379 class HTMLBreakOnClosingTagException(Exception):
380 pass
381
382 def __init__(self):
383 self.tagstack = collections.deque()
384 html.parser.HTMLParser.__init__(self)
385
386 def __enter__(self):
387 return self
388
389 def __exit__(self, *_):
390 self.close()
391
392 def close(self):
393 # handle_endtag does not return upon raising HTMLBreakOnClosingTagException,
394 # so data remains buffered; we no longer have any interest in it, thus
395 # override this method to discard it
396 pass
397
398 def handle_starttag(self, tag, _):
399 self.tagstack.append(tag)
400
401 def handle_endtag(self, tag):
402 if not self.tagstack:
403 raise compat_HTMLParseError('no tags in the stack')
404 while self.tagstack:
405 inner_tag = self.tagstack.pop()
406 if inner_tag == tag:
407 break
408 else:
409 raise compat_HTMLParseError(f'matching opening tag for closing {tag} tag not found')
410 if not self.tagstack:
411 raise self.HTMLBreakOnClosingTagException()
412
413
414 # XXX: This should be far less strict
415 def get_element_text_and_html_by_tag(tag, html):
416 """
417 For the first element with the specified tag in the passed HTML document
418 return its' content (text) and the whole element (html)
419 """
420 def find_or_raise(haystack, needle, exc):
421 try:
422 return haystack.index(needle)
423 except ValueError:
424 raise exc
425 closing_tag = f'</{tag}>'
426 whole_start = find_or_raise(
427 html, f'<{tag}', compat_HTMLParseError(f'opening {tag} tag not found'))
428 content_start = find_or_raise(
429 html[whole_start:], '>', compat_HTMLParseError(f'malformed opening {tag} tag'))
430 content_start += whole_start + 1
431 with HTMLBreakOnClosingTagParser() as parser:
432 parser.feed(html[whole_start:content_start])
433 if not parser.tagstack or parser.tagstack[0] != tag:
434 raise compat_HTMLParseError(f'parser did not match opening {tag} tag')
435 offset = content_start
436 while offset < len(html):
437 next_closing_tag_start = find_or_raise(
438 html[offset:], closing_tag,
439 compat_HTMLParseError(f'closing {tag} tag not found'))
440 next_closing_tag_end = next_closing_tag_start + len(closing_tag)
441 try:
442 parser.feed(html[offset:offset + next_closing_tag_end])
443 offset += next_closing_tag_end
444 except HTMLBreakOnClosingTagParser.HTMLBreakOnClosingTagException:
445 return html[content_start:offset + next_closing_tag_start], \
446 html[whole_start:offset + next_closing_tag_end]
447 raise compat_HTMLParseError('unexpected end of html')
448
449
450 class HTMLAttributeParser(html.parser.HTMLParser):
451 """Trivial HTML parser to gather the attributes for a single element"""
452
453 def __init__(self):
454 self.attrs = {}
455 html.parser.HTMLParser.__init__(self)
456
457 def handle_starttag(self, tag, attrs):
458 self.attrs = dict(attrs)
459 raise compat_HTMLParseError('done')
460
461
462 class HTMLListAttrsParser(html.parser.HTMLParser):
463 """HTML parser to gather the attributes for the elements of a list"""
464
465 def __init__(self):
466 html.parser.HTMLParser.__init__(self)
467 self.items = []
468 self._level = 0
469
470 def handle_starttag(self, tag, attrs):
471 if tag == 'li' and self._level == 0:
472 self.items.append(dict(attrs))
473 self._level += 1
474
475 def handle_endtag(self, tag):
476 self._level -= 1
477
478
479 def extract_attributes(html_element):
480 """Given a string for an HTML element such as
481 <el
482 a="foo" B="bar" c="&98;az" d=boz
483 empty= noval entity="&amp;"
484 sq='"' dq="'"
485 >
486 Decode and return a dictionary of attributes.
487 {
488 'a': 'foo', 'b': 'bar', c: 'baz', d: 'boz',
489 'empty': '', 'noval': None, 'entity': '&',
490 'sq': '"', 'dq': '\''
491 }.
492 """
493 parser = HTMLAttributeParser()
494 with contextlib.suppress(compat_HTMLParseError):
495 parser.feed(html_element)
496 parser.close()
497 return parser.attrs
498
499
500 def parse_list(webpage):
501 """Given a string for an series of HTML <li> elements,
502 return a dictionary of their attributes"""
503 parser = HTMLListAttrsParser()
504 parser.feed(webpage)
505 parser.close()
506 return parser.items
507
508
509 def clean_html(html):
510 """Clean an HTML snippet into a readable string"""
511
512 if html is None: # Convenience for sanitizing descriptions etc.
513 return html
514
515 html = re.sub(r'\s+', ' ', html)
516 html = re.sub(r'(?u)\s?<\s?br\s?/?\s?>\s?', '\n', html)
517 html = re.sub(r'(?u)<\s?/\s?p\s?>\s?<\s?p[^>]*>', '\n', html)
518 # Strip html tags
519 html = re.sub('<.*?>', '', html)
520 # Replace html entities
521 html = unescapeHTML(html)
522 return html.strip()
523
524
525 class LenientJSONDecoder(json.JSONDecoder):
526 # TODO: Write tests
527 def __init__(self, *args, transform_source=None, ignore_extra=False, close_objects=0, **kwargs):
528 self.transform_source, self.ignore_extra = transform_source, ignore_extra
529 self._close_attempts = 2 * close_objects
530 super().__init__(*args, **kwargs)
531
532 @staticmethod
533 def _close_object(err):
534 doc = err.doc[:err.pos]
535 # We need to add comma first to get the correct error message
536 if err.msg.startswith('Expecting \',\''):
537 return doc + ','
538 elif not doc.endswith(','):
539 return
540
541 if err.msg.startswith('Expecting property name'):
542 return doc[:-1] + '}'
543 elif err.msg.startswith('Expecting value'):
544 return doc[:-1] + ']'
545
546 def decode(self, s):
547 if self.transform_source:
548 s = self.transform_source(s)
549 for attempt in range(self._close_attempts + 1):
550 try:
551 if self.ignore_extra:
552 return self.raw_decode(s.lstrip())[0]
553 return super().decode(s)
554 except json.JSONDecodeError as e:
555 if e.pos is None:
556 raise
557 elif attempt < self._close_attempts:
558 s = self._close_object(e)
559 if s is not None:
560 continue
561 raise type(e)(f'{e.msg} in {s[e.pos-10:e.pos+10]!r}', s, e.pos)
562 assert False, 'Too many attempts to decode JSON'
563
564
565 def sanitize_open(filename, open_mode):
566 """Try to open the given filename, and slightly tweak it if this fails.
567
568 Attempts to open the given filename. If this fails, it tries to change
569 the filename slightly, step by step, until it's either able to open it
570 or it fails and raises a final exception, like the standard open()
571 function.
572
573 It returns the tuple (stream, definitive_file_name).
574 """
575 if filename == '-':
576 if sys.platform == 'win32':
577 import msvcrt
578
579 # stdout may be any IO stream, e.g. when using contextlib.redirect_stdout
580 with contextlib.suppress(io.UnsupportedOperation):
581 msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
582 return (sys.stdout.buffer if hasattr(sys.stdout, 'buffer') else sys.stdout, filename)
583
584 for attempt in range(2):
585 try:
586 try:
587 if sys.platform == 'win32':
588 # FIXME: An exclusive lock also locks the file from being read.
589 # Since windows locks are mandatory, don't lock the file on windows (for now).
590 # Ref: https://github.com/yt-dlp/yt-dlp/issues/3124
591 raise LockingUnsupportedError()
592 stream = locked_file(filename, open_mode, block=False).__enter__()
593 except OSError:
594 stream = open(filename, open_mode)
595 return stream, filename
596 except OSError as err:
597 if attempt or err.errno in (errno.EACCES,):
598 raise
599 old_filename, filename = filename, sanitize_path(filename)
600 if old_filename == filename:
601 raise
602
603
604 def timeconvert(timestr):
605 """Convert RFC 2822 defined time string into system timestamp"""
606 timestamp = None
607 timetuple = email.utils.parsedate_tz(timestr)
608 if timetuple is not None:
609 timestamp = email.utils.mktime_tz(timetuple)
610 return timestamp
611
612
613 def sanitize_filename(s, restricted=False, is_id=NO_DEFAULT):
614 """Sanitizes a string so it could be used as part of a filename.
615 @param restricted Use a stricter subset of allowed characters
616 @param is_id Whether this is an ID that should be kept unchanged if possible.
617 If unset, yt-dlp's new sanitization rules are in effect
618 """
619 if s == '':
620 return ''
621
622 def replace_insane(char):
623 if restricted and char in ACCENT_CHARS:
624 return ACCENT_CHARS[char]
625 elif not restricted and char == '\n':
626 return '\0 '
627 elif is_id is NO_DEFAULT and not restricted and char in '"*:<>?|/\\':
628 # Replace with their full-width unicode counterparts
629 return {'/': '\u29F8', '\\': '\u29f9'}.get(char, chr(ord(char) + 0xfee0))
630 elif char == '?' or ord(char) < 32 or ord(char) == 127:
631 return ''
632 elif char == '"':
633 return '' if restricted else '\''
634 elif char == ':':
635 return '\0_\0-' if restricted else '\0 \0-'
636 elif char in '\\/|*<>':
637 return '\0_'
638 if restricted and (char in '!&\'()[]{}$;`^,#' or char.isspace() or ord(char) > 127):
639 return '\0_'
640 return char
641
642 # Replace look-alike Unicode glyphs
643 if restricted and (is_id is NO_DEFAULT or not is_id):
644 s = unicodedata.normalize('NFKC', s)
645 s = re.sub(r'[0-9]+(?::[0-9]+)+', lambda m: m.group(0).replace(':', '_'), s) # Handle timestamps
646 result = ''.join(map(replace_insane, s))
647 if is_id is NO_DEFAULT:
648 result = re.sub(r'(\0.)(?:(?=\1)..)+', r'\1', result) # Remove repeated substitute chars
649 STRIP_RE = r'(?:\0.|[ _-])*'
650 result = re.sub(f'^\0.{STRIP_RE}|{STRIP_RE}\0.$', '', result) # Remove substitute chars from start/end
651 result = result.replace('\0', '') or '_'
652
653 if not is_id:
654 while '__' in result:
655 result = result.replace('__', '_')
656 result = result.strip('_')
657 # Common case of "Foreign band name - English song title"
658 if restricted and result.startswith('-_'):
659 result = result[2:]
660 if result.startswith('-'):
661 result = '_' + result[len('-'):]
662 result = result.lstrip('.')
663 if not result:
664 result = '_'
665 return result
666
667
668 def sanitize_path(s, force=False):
669 """Sanitizes and normalizes path on Windows"""
670 # XXX: this handles drive relative paths (c:sth) incorrectly
671 if sys.platform == 'win32':
672 force = False
673 drive_or_unc, _ = os.path.splitdrive(s)
674 elif force:
675 drive_or_unc = ''
676 else:
677 return s
678
679 norm_path = os.path.normpath(remove_start(s, drive_or_unc)).split(os.path.sep)
680 if drive_or_unc:
681 norm_path.pop(0)
682 sanitized_path = [
683 path_part if path_part in ['.', '..'] else re.sub(r'(?:[/<>:"\|\\?\*]|[\s.]$)', '#', path_part)
684 for path_part in norm_path]
685 if drive_or_unc:
686 sanitized_path.insert(0, drive_or_unc + os.path.sep)
687 elif force and s and s[0] == os.path.sep:
688 sanitized_path.insert(0, os.path.sep)
689 # TODO: Fix behavioral differences <3.12
690 # The workaround using `normpath` only superficially passes tests
691 # Ref: https://github.com/python/cpython/pull/100351
692 return os.path.normpath(os.path.join(*sanitized_path))
693
694
695 def sanitize_url(url, *, scheme='http'):
696 # Prepend protocol-less URLs with `http:` scheme in order to mitigate
697 # the number of unwanted failures due to missing protocol
698 if url is None:
699 return
700 elif url.startswith('//'):
701 return f'{scheme}:{url}'
702 # Fix some common typos seen so far
703 COMMON_TYPOS = (
704 # https://github.com/ytdl-org/youtube-dl/issues/15649
705 (r'^httpss://', r'https://'),
706 # https://bx1.be/lives/direct-tv/
707 (r'^rmtp([es]?)://', r'rtmp\1://'),
708 )
709 for mistake, fixup in COMMON_TYPOS:
710 if re.match(mistake, url):
711 return re.sub(mistake, fixup, url)
712 return url
713
714
715 def extract_basic_auth(url):
716 parts = urllib.parse.urlsplit(url)
717 if parts.username is None:
718 return url, None
719 url = urllib.parse.urlunsplit(parts._replace(netloc=(
720 parts.hostname if parts.port is None
721 else '%s:%d' % (parts.hostname, parts.port))))
722 auth_payload = base64.b64encode(
723 ('%s:%s' % (parts.username, parts.password or '')).encode())
724 return url, f'Basic {auth_payload.decode()}'
725
726
727 def expand_path(s):
728 """Expand shell variables and ~"""
729 return os.path.expandvars(compat_expanduser(s))
730
731
732 def orderedSet(iterable, *, lazy=False):
733 """Remove all duplicates from the input iterable"""
734 def _iter():
735 seen = [] # Do not use set since the items can be unhashable
736 for x in iterable:
737 if x not in seen:
738 seen.append(x)
739 yield x
740
741 return _iter() if lazy else list(_iter())
742
743
744 def _htmlentity_transform(entity_with_semicolon):
745 """Transforms an HTML entity to a character."""
746 entity = entity_with_semicolon[:-1]
747
748 # Known non-numeric HTML entity
749 if entity in html.entities.name2codepoint:
750 return chr(html.entities.name2codepoint[entity])
751
752 # TODO: HTML5 allows entities without a semicolon.
753 # E.g. '&Eacuteric' should be decoded as 'Éric'.
754 if entity_with_semicolon in html.entities.html5:
755 return html.entities.html5[entity_with_semicolon]
756
757 mobj = re.match(r'#(x[0-9a-fA-F]+|[0-9]+)', entity)
758 if mobj is not None:
759 numstr = mobj.group(1)
760 if numstr.startswith('x'):
761 base = 16
762 numstr = '0%s' % numstr
763 else:
764 base = 10
765 # See https://github.com/ytdl-org/youtube-dl/issues/7518
766 with contextlib.suppress(ValueError):
767 return chr(int(numstr, base))
768
769 # Unknown entity in name, return its literal representation
770 return '&%s;' % entity
771
772
773 def unescapeHTML(s):
774 if s is None:
775 return None
776 assert isinstance(s, str)
777
778 return re.sub(
779 r'&([^&;]+;)', lambda m: _htmlentity_transform(m.group(1)), s)
780
781
782 def escapeHTML(text):
783 return (
784 text
785 .replace('&', '&amp;')
786 .replace('<', '&lt;')
787 .replace('>', '&gt;')
788 .replace('"', '&quot;')
789 .replace("'", '&#39;')
790 )
791
792
793 class netrc_from_content(netrc.netrc):
794 def __init__(self, content):
795 self.hosts, self.macros = {}, {}
796 with io.StringIO(content) as stream:
797 self._parse('-', stream, False)
798
799
800 class Popen(subprocess.Popen):
801 if sys.platform == 'win32':
802 _startupinfo = subprocess.STARTUPINFO()
803 _startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
804 else:
805 _startupinfo = None
806
807 @staticmethod
808 def _fix_pyinstaller_ld_path(env):
809 """Restore LD_LIBRARY_PATH when using PyInstaller
810 Ref: https://github.com/pyinstaller/pyinstaller/blob/develop/doc/runtime-information.rst#ld_library_path--libpath-considerations
811 https://github.com/yt-dlp/yt-dlp/issues/4573
812 """
813 if not hasattr(sys, '_MEIPASS'):
814 return
815
816 def _fix(key):
817 orig = env.get(f'{key}_ORIG')
818 if orig is None:
819 env.pop(key, None)
820 else:
821 env[key] = orig
822
823 _fix('LD_LIBRARY_PATH') # Linux
824 _fix('DYLD_LIBRARY_PATH') # macOS
825
826 def __init__(self, args, *remaining, env=None, text=False, shell=False, **kwargs):
827 if env is None:
828 env = os.environ.copy()
829 self._fix_pyinstaller_ld_path(env)
830
831 self.__text_mode = kwargs.get('encoding') or kwargs.get('errors') or text or kwargs.get('universal_newlines')
832 if text is True:
833 kwargs['universal_newlines'] = True # For 3.6 compatibility
834 kwargs.setdefault('encoding', 'utf-8')
835 kwargs.setdefault('errors', 'replace')
836
837 if shell and compat_os_name == 'nt' and kwargs.get('executable') is None:
838 if not isinstance(args, str):
839 args = ' '.join(compat_shlex_quote(a) for a in args)
840 shell = False
841 args = f'{self.__comspec()} /Q /S /D /V:OFF /C "{args}"'
842
843 super().__init__(args, *remaining, env=env, shell=shell, **kwargs, startupinfo=self._startupinfo)
844
845 def __comspec(self):
846 comspec = os.environ.get('ComSpec') or os.path.join(
847 os.environ.get('SystemRoot', ''), 'System32', 'cmd.exe')
848 if os.path.isabs(comspec):
849 return comspec
850 raise FileNotFoundError('shell not found: neither %ComSpec% nor %SystemRoot% is set')
851
852 def communicate_or_kill(self, *args, **kwargs):
853 try:
854 return self.communicate(*args, **kwargs)
855 except BaseException: # Including KeyboardInterrupt
856 self.kill(timeout=None)
857 raise
858
859 def kill(self, *, timeout=0):
860 super().kill()
861 if timeout != 0:
862 self.wait(timeout=timeout)
863
864 @classmethod
865 def run(cls, *args, timeout=None, **kwargs):
866 with cls(*args, **kwargs) as proc:
867 default = '' if proc.__text_mode else b''
868 stdout, stderr = proc.communicate_or_kill(timeout=timeout)
869 return stdout or default, stderr or default, proc.returncode
870
871
872 def encodeArgument(s):
873 # Legacy code that uses byte strings
874 # Uncomment the following line after fixing all post processors
875 # assert isinstance(s, str), 'Internal error: %r should be of type %r, is %r' % (s, str, type(s))
876 return s if isinstance(s, str) else s.decode('ascii')
877
878
879 _timetuple = collections.namedtuple('Time', ('hours', 'minutes', 'seconds', 'milliseconds'))
880
881
882 def timetuple_from_msec(msec):
883 secs, msec = divmod(msec, 1000)
884 mins, secs = divmod(secs, 60)
885 hrs, mins = divmod(mins, 60)
886 return _timetuple(hrs, mins, secs, msec)
887
888
889 def formatSeconds(secs, delim=':', msec=False):
890 time = timetuple_from_msec(secs * 1000)
891 if time.hours:
892 ret = '%d%s%02d%s%02d' % (time.hours, delim, time.minutes, delim, time.seconds)
893 elif time.minutes:
894 ret = '%d%s%02d' % (time.minutes, delim, time.seconds)
895 else:
896 ret = '%d' % time.seconds
897 return '%s.%03d' % (ret, time.milliseconds) if msec else ret
898
899
900 def bug_reports_message(before=';'):
901 from ..update import REPOSITORY
902
903 msg = (f'please report this issue on https://github.com/{REPOSITORY}/issues?q= , '
904 'filling out the appropriate issue template. Confirm you are on the latest version using yt-dlp -U')
905
906 before = before.rstrip()
907 if not before or before.endswith(('.', '!', '?')):
908 msg = msg[0].title() + msg[1:]
909
910 return (before + ' ' if before else '') + msg
911
912
913 class YoutubeDLError(Exception):
914 """Base exception for YoutubeDL errors."""
915 msg = None
916
917 def __init__(self, msg=None):
918 if msg is not None:
919 self.msg = msg
920 elif self.msg is None:
921 self.msg = type(self).__name__
922 super().__init__(self.msg)
923
924
925 class ExtractorError(YoutubeDLError):
926 """Error during info extraction."""
927
928 def __init__(self, msg, tb=None, expected=False, cause=None, video_id=None, ie=None):
929 """ tb, if given, is the original traceback (so that it can be printed out).
930 If expected is set, this is a normal error message and most likely not a bug in yt-dlp.
931 """
932 from ..networking.exceptions import network_exceptions
933 if sys.exc_info()[0] in network_exceptions:
934 expected = True
935
936 self.orig_msg = str(msg)
937 self.traceback = tb
938 self.expected = expected
939 self.cause = cause
940 self.video_id = video_id
941 self.ie = ie
942 self.exc_info = sys.exc_info() # preserve original exception
943 if isinstance(self.exc_info[1], ExtractorError):
944 self.exc_info = self.exc_info[1].exc_info
945 super().__init__(self.__msg)
946
947 @property
948 def __msg(self):
949 return ''.join((
950 format_field(self.ie, None, '[%s] '),
951 format_field(self.video_id, None, '%s: '),
952 self.orig_msg,
953 format_field(self.cause, None, ' (caused by %r)'),
954 '' if self.expected else bug_reports_message()))
955
956 def format_traceback(self):
957 return join_nonempty(
958 self.traceback and ''.join(traceback.format_tb(self.traceback)),
959 self.cause and ''.join(traceback.format_exception(None, self.cause, self.cause.__traceback__)[1:]),
960 delim='\n') or None
961
962 def __setattr__(self, name, value):
963 super().__setattr__(name, value)
964 if getattr(self, 'msg', None) and name not in ('msg', 'args'):
965 self.msg = self.__msg or type(self).__name__
966 self.args = (self.msg, ) # Cannot be property
967
968
969 class UnsupportedError(ExtractorError):
970 def __init__(self, url):
971 super().__init__(
972 'Unsupported URL: %s' % url, expected=True)
973 self.url = url
974
975
976 class RegexNotFoundError(ExtractorError):
977 """Error when a regex didn't match"""
978 pass
979
980
981 class GeoRestrictedError(ExtractorError):
982 """Geographic restriction Error exception.
983
984 This exception may be thrown when a video is not available from your
985 geographic location due to geographic restrictions imposed by a website.
986 """
987
988 def __init__(self, msg, countries=None, **kwargs):
989 kwargs['expected'] = True
990 super().__init__(msg, **kwargs)
991 self.countries = countries
992
993
994 class UserNotLive(ExtractorError):
995 """Error when a channel/user is not live"""
996
997 def __init__(self, msg=None, **kwargs):
998 kwargs['expected'] = True
999 super().__init__(msg or 'The channel is not currently live', **kwargs)
1000
1001
1002 class DownloadError(YoutubeDLError):
1003 """Download Error exception.
1004
1005 This exception may be thrown by FileDownloader objects if they are not
1006 configured to continue on errors. They will contain the appropriate
1007 error message.
1008 """
1009
1010 def __init__(self, msg, exc_info=None):
1011 """ exc_info, if given, is the original exception that caused the trouble (as returned by sys.exc_info()). """
1012 super().__init__(msg)
1013 self.exc_info = exc_info
1014
1015
1016 class EntryNotInPlaylist(YoutubeDLError):
1017 """Entry not in playlist exception.
1018
1019 This exception will be thrown by YoutubeDL when a requested entry
1020 is not found in the playlist info_dict
1021 """
1022 msg = 'Entry not found in info'
1023
1024
1025 class SameFileError(YoutubeDLError):
1026 """Same File exception.
1027
1028 This exception will be thrown by FileDownloader objects if they detect
1029 multiple files would have to be downloaded to the same file on disk.
1030 """
1031 msg = 'Fixed output name but more than one file to download'
1032
1033 def __init__(self, filename=None):
1034 if filename is not None:
1035 self.msg += f': {filename}'
1036 super().__init__(self.msg)
1037
1038
1039 class PostProcessingError(YoutubeDLError):
1040 """Post Processing exception.
1041
1042 This exception may be raised by PostProcessor's .run() method to
1043 indicate an error in the postprocessing task.
1044 """
1045
1046
1047 class DownloadCancelled(YoutubeDLError):
1048 """ Exception raised when the download queue should be interrupted """
1049 msg = 'The download was cancelled'
1050
1051
1052 class ExistingVideoReached(DownloadCancelled):
1053 """ --break-on-existing triggered """
1054 msg = 'Encountered a video that is already in the archive, stopping due to --break-on-existing'
1055
1056
1057 class RejectedVideoReached(DownloadCancelled):
1058 """ --break-match-filter triggered """
1059 msg = 'Encountered a video that did not match filter, stopping due to --break-match-filter'
1060
1061
1062 class MaxDownloadsReached(DownloadCancelled):
1063 """ --max-downloads limit has been reached. """
1064 msg = 'Maximum number of downloads reached, stopping due to --max-downloads'
1065
1066
1067 class ReExtractInfo(YoutubeDLError):
1068 """ Video info needs to be re-extracted. """
1069
1070 def __init__(self, msg, expected=False):
1071 super().__init__(msg)
1072 self.expected = expected
1073
1074
1075 class ThrottledDownload(ReExtractInfo):
1076 """ Download speed below --throttled-rate. """
1077 msg = 'The download speed is below throttle limit'
1078
1079 def __init__(self):
1080 super().__init__(self.msg, expected=False)
1081
1082
1083 class UnavailableVideoError(YoutubeDLError):
1084 """Unavailable Format exception.
1085
1086 This exception will be thrown when a video is requested
1087 in a format that is not available for that video.
1088 """
1089 msg = 'Unable to download video'
1090
1091 def __init__(self, err=None):
1092 if err is not None:
1093 self.msg += f': {err}'
1094 super().__init__(self.msg)
1095
1096
1097 class ContentTooShortError(YoutubeDLError):
1098 """Content Too Short exception.
1099
1100 This exception may be raised by FileDownloader objects when a file they
1101 download is too small for what the server announced first, indicating
1102 the connection was probably interrupted.
1103 """
1104
1105 def __init__(self, downloaded, expected):
1106 super().__init__(f'Downloaded {downloaded} bytes, expected {expected} bytes')
1107 # Both in bytes
1108 self.downloaded = downloaded
1109 self.expected = expected
1110
1111
1112 class XAttrMetadataError(YoutubeDLError):
1113 def __init__(self, code=None, msg='Unknown error'):
1114 super().__init__(msg)
1115 self.code = code
1116 self.msg = msg
1117
1118 # Parsing code and msg
1119 if (self.code in (errno.ENOSPC, errno.EDQUOT)
1120 or 'No space left' in self.msg or 'Disk quota exceeded' in self.msg):
1121 self.reason = 'NO_SPACE'
1122 elif self.code == errno.E2BIG or 'Argument list too long' in self.msg:
1123 self.reason = 'VALUE_TOO_LONG'
1124 else:
1125 self.reason = 'NOT_SUPPORTED'
1126
1127
1128 class XAttrUnavailableError(YoutubeDLError):
1129 pass
1130
1131
1132 def is_path_like(f):
1133 return isinstance(f, (str, bytes, os.PathLike))
1134
1135
1136 def extract_timezone(date_str):
1137 m = re.search(
1138 r'''(?x)
1139 ^.{8,}? # >=8 char non-TZ prefix, if present
1140 (?P<tz>Z| # just the UTC Z, or
1141 (?:(?<=.\b\d{4}|\b\d{2}:\d\d)| # preceded by 4 digits or hh:mm or
1142 (?<!.\b[a-zA-Z]{3}|[a-zA-Z]{4}|..\b\d\d)) # not preceded by 3 alpha word or >= 4 alpha or 2 digits
1143 [ ]? # optional space
1144 (?P<sign>\+|-) # +/-
1145 (?P<hours>[0-9]{2}):?(?P<minutes>[0-9]{2}) # hh[:]mm
1146 $)
1147 ''', date_str)
1148 if not m:
1149 m = re.search(r'\d{1,2}:\d{1,2}(?:\.\d+)?(?P<tz>\s*[A-Z]+)$', date_str)
1150 timezone = TIMEZONE_NAMES.get(m and m.group('tz').strip())
1151 if timezone is not None:
1152 date_str = date_str[:-len(m.group('tz'))]
1153 timezone = datetime.timedelta(hours=timezone or 0)
1154 else:
1155 date_str = date_str[:-len(m.group('tz'))]
1156 if not m.group('sign'):
1157 timezone = datetime.timedelta()
1158 else:
1159 sign = 1 if m.group('sign') == '+' else -1
1160 timezone = datetime.timedelta(
1161 hours=sign * int(m.group('hours')),
1162 minutes=sign * int(m.group('minutes')))
1163 return timezone, date_str
1164
1165
1166 def parse_iso8601(date_str, delimiter='T', timezone=None):
1167 """ Return a UNIX timestamp from the given date """
1168
1169 if date_str is None:
1170 return None
1171
1172 date_str = re.sub(r'\.[0-9]+', '', date_str)
1173
1174 if timezone is None:
1175 timezone, date_str = extract_timezone(date_str)
1176
1177 with contextlib.suppress(ValueError):
1178 date_format = f'%Y-%m-%d{delimiter}%H:%M:%S'
1179 dt = datetime.datetime.strptime(date_str, date_format) - timezone
1180 return calendar.timegm(dt.timetuple())
1181
1182
1183 def date_formats(day_first=True):
1184 return DATE_FORMATS_DAY_FIRST if day_first else DATE_FORMATS_MONTH_FIRST
1185
1186
1187 def unified_strdate(date_str, day_first=True):
1188 """Return a string with the date in the format YYYYMMDD"""
1189
1190 if date_str is None:
1191 return None
1192 upload_date = None
1193 # Replace commas
1194 date_str = date_str.replace(',', ' ')
1195 # Remove AM/PM + timezone
1196 date_str = re.sub(r'(?i)\s*(?:AM|PM)(?:\s+[A-Z]+)?', '', date_str)
1197 _, date_str = extract_timezone(date_str)
1198
1199 for expression in date_formats(day_first):
1200 with contextlib.suppress(ValueError):
1201 upload_date = datetime.datetime.strptime(date_str, expression).strftime('%Y%m%d')
1202 if upload_date is None:
1203 timetuple = email.utils.parsedate_tz(date_str)
1204 if timetuple:
1205 with contextlib.suppress(ValueError):
1206 upload_date = datetime.datetime(*timetuple[:6]).strftime('%Y%m%d')
1207 if upload_date is not None:
1208 return str(upload_date)
1209
1210
1211 def unified_timestamp(date_str, day_first=True):
1212 if not isinstance(date_str, str):
1213 return None
1214
1215 date_str = re.sub(r'\s+', ' ', re.sub(
1216 r'(?i)[,|]|(mon|tues?|wed(nes)?|thu(rs)?|fri|sat(ur)?)(day)?', '', date_str))
1217
1218 pm_delta = 12 if re.search(r'(?i)PM', date_str) else 0
1219 timezone, date_str = extract_timezone(date_str)
1220
1221 # Remove AM/PM + timezone
1222 date_str = re.sub(r'(?i)\s*(?:AM|PM)(?:\s+[A-Z]+)?', '', date_str)
1223
1224 # Remove unrecognized timezones from ISO 8601 alike timestamps
1225 m = re.search(r'\d{1,2}:\d{1,2}(?:\.\d+)?(?P<tz>\s*[A-Z]+)$', date_str)
1226 if m:
1227 date_str = date_str[:-len(m.group('tz'))]
1228
1229 # Python only supports microseconds, so remove nanoseconds
1230 m = re.search(r'^([0-9]{4,}-[0-9]{1,2}-[0-9]{1,2}T[0-9]{1,2}:[0-9]{1,2}:[0-9]{1,2}\.[0-9]{6})[0-9]+$', date_str)
1231 if m:
1232 date_str = m.group(1)
1233
1234 for expression in date_formats(day_first):
1235 with contextlib.suppress(ValueError):
1236 dt = datetime.datetime.strptime(date_str, expression) - timezone + datetime.timedelta(hours=pm_delta)
1237 return calendar.timegm(dt.timetuple())
1238
1239 timetuple = email.utils.parsedate_tz(date_str)
1240 if timetuple:
1241 return calendar.timegm(timetuple) + pm_delta * 3600 - timezone.total_seconds()
1242
1243
1244 def determine_ext(url, default_ext='unknown_video'):
1245 if url is None or '.' not in url:
1246 return default_ext
1247 guess = url.partition('?')[0].rpartition('.')[2]
1248 if re.match(r'^[A-Za-z0-9]+$', guess):
1249 return guess
1250 # Try extract ext from URLs like http://example.com/foo/bar.mp4/?download
1251 elif guess.rstrip('/') in KNOWN_EXTENSIONS:
1252 return guess.rstrip('/')
1253 else:
1254 return default_ext
1255
1256
1257 def subtitles_filename(filename, sub_lang, sub_format, expected_real_ext=None):
1258 return replace_extension(filename, sub_lang + '.' + sub_format, expected_real_ext)
1259
1260
1261 def datetime_from_str(date_str, precision='auto', format='%Y%m%d'):
1262 R"""
1263 Return a datetime object from a string.
1264 Supported format:
1265 (now|today|yesterday|DATE)([+-]\d+(microsecond|second|minute|hour|day|week|month|year)s?)?
1266
1267 @param format strftime format of DATE
1268 @param precision Round the datetime object: auto|microsecond|second|minute|hour|day
1269 auto: round to the unit provided in date_str (if applicable).
1270 """
1271 auto_precision = False
1272 if precision == 'auto':
1273 auto_precision = True
1274 precision = 'microsecond'
1275 today = datetime_round(datetime.datetime.now(datetime.timezone.utc), precision)
1276 if date_str in ('now', 'today'):
1277 return today
1278 if date_str == 'yesterday':
1279 return today - datetime.timedelta(days=1)
1280 match = re.match(
1281 r'(?P<start>.+)(?P<sign>[+-])(?P<time>\d+)(?P<unit>microsecond|second|minute|hour|day|week|month|year)s?',
1282 date_str)
1283 if match is not None:
1284 start_time = datetime_from_str(match.group('start'), precision, format)
1285 time = int(match.group('time')) * (-1 if match.group('sign') == '-' else 1)
1286 unit = match.group('unit')
1287 if unit == 'month' or unit == 'year':
1288 new_date = datetime_add_months(start_time, time * 12 if unit == 'year' else time)
1289 unit = 'day'
1290 else:
1291 if unit == 'week':
1292 unit = 'day'
1293 time *= 7
1294 delta = datetime.timedelta(**{unit + 's': time})
1295 new_date = start_time + delta
1296 if auto_precision:
1297 return datetime_round(new_date, unit)
1298 return new_date
1299
1300 return datetime_round(datetime.datetime.strptime(date_str, format), precision)
1301
1302
1303 def date_from_str(date_str, format='%Y%m%d', strict=False):
1304 R"""
1305 Return a date object from a string using datetime_from_str
1306
1307 @param strict Restrict allowed patterns to "YYYYMMDD" and
1308 (now|today|yesterday)(-\d+(day|week|month|year)s?)?
1309 """
1310 if strict and not re.fullmatch(r'\d{8}|(now|today|yesterday)(-\d+(day|week|month|year)s?)?', date_str):
1311 raise ValueError(f'Invalid date format "{date_str}"')
1312 return datetime_from_str(date_str, precision='microsecond', format=format).date()
1313
1314
1315 def datetime_add_months(dt, months):
1316 """Increment/Decrement a datetime object by months."""
1317 month = dt.month + months - 1
1318 year = dt.year + month // 12
1319 month = month % 12 + 1
1320 day = min(dt.day, calendar.monthrange(year, month)[1])
1321 return dt.replace(year, month, day)
1322
1323
1324 def datetime_round(dt, precision='day'):
1325 """
1326 Round a datetime object's time to a specific precision
1327 """
1328 if precision == 'microsecond':
1329 return dt
1330
1331 unit_seconds = {
1332 'day': 86400,
1333 'hour': 3600,
1334 'minute': 60,
1335 'second': 1,
1336 }
1337 roundto = lambda x, n: ((x + n / 2) // n) * n
1338 timestamp = roundto(calendar.timegm(dt.timetuple()), unit_seconds[precision])
1339 return datetime.datetime.fromtimestamp(timestamp, datetime.timezone.utc)
1340
1341
1342 def hyphenate_date(date_str):
1343 """
1344 Convert a date in 'YYYYMMDD' format to 'YYYY-MM-DD' format"""
1345 match = re.match(r'^(\d\d\d\d)(\d\d)(\d\d)$', date_str)
1346 if match is not None:
1347 return '-'.join(match.groups())
1348 else:
1349 return date_str
1350
1351
1352 class DateRange:
1353 """Represents a time interval between two dates"""
1354
1355 def __init__(self, start=None, end=None):
1356 """start and end must be strings in the format accepted by date"""
1357 if start is not None:
1358 self.start = date_from_str(start, strict=True)
1359 else:
1360 self.start = datetime.datetime.min.date()
1361 if end is not None:
1362 self.end = date_from_str(end, strict=True)
1363 else:
1364 self.end = datetime.datetime.max.date()
1365 if self.start > self.end:
1366 raise ValueError('Date range: "%s" , the start date must be before the end date' % self)
1367
1368 @classmethod
1369 def day(cls, day):
1370 """Returns a range that only contains the given day"""
1371 return cls(day, day)
1372
1373 def __contains__(self, date):
1374 """Check if the date is in the range"""
1375 if not isinstance(date, datetime.date):
1376 date = date_from_str(date)
1377 return self.start <= date <= self.end
1378
1379 def __repr__(self):
1380 return f'{__name__}.{type(self).__name__}({self.start.isoformat()!r}, {self.end.isoformat()!r})'
1381
1382 def __eq__(self, other):
1383 return (isinstance(other, DateRange)
1384 and self.start == other.start and self.end == other.end)
1385
1386
1387 @functools.cache
1388 def system_identifier():
1389 python_implementation = platform.python_implementation()
1390 if python_implementation == 'PyPy' and hasattr(sys, 'pypy_version_info'):
1391 python_implementation += ' version %d.%d.%d' % sys.pypy_version_info[:3]
1392 libc_ver = []
1393 with contextlib.suppress(OSError): # We may not have access to the executable
1394 libc_ver = platform.libc_ver()
1395
1396 return 'Python %s (%s %s %s) - %s (%s%s)' % (
1397 platform.python_version(),
1398 python_implementation,
1399 platform.machine(),
1400 platform.architecture()[0],
1401 platform.platform(),
1402 ssl.OPENSSL_VERSION,
1403 format_field(join_nonempty(*libc_ver, delim=' '), None, ', %s'),
1404 )
1405
1406
1407 @functools.cache
1408 def get_windows_version():
1409 ''' Get Windows version. returns () if it's not running on Windows '''
1410 if compat_os_name == 'nt':
1411 return version_tuple(platform.win32_ver()[1])
1412 else:
1413 return ()
1414
1415
1416 def write_string(s, out=None, encoding=None):
1417 assert isinstance(s, str)
1418 out = out or sys.stderr
1419 # `sys.stderr` might be `None` (Ref: https://github.com/pyinstaller/pyinstaller/pull/7217)
1420 if not out:
1421 return
1422
1423 if compat_os_name == 'nt' and supports_terminal_sequences(out):
1424 s = re.sub(r'([\r\n]+)', r' \1', s)
1425
1426 enc, buffer = None, out
1427 if 'b' in getattr(out, 'mode', ''):
1428 enc = encoding or preferredencoding()
1429 elif hasattr(out, 'buffer'):
1430 buffer = out.buffer
1431 enc = encoding or getattr(out, 'encoding', None) or preferredencoding()
1432
1433 buffer.write(s.encode(enc, 'ignore') if enc else s)
1434 out.flush()
1435
1436
1437 # TODO: Use global logger
1438 def deprecation_warning(msg, *, printer=None, stacklevel=0, **kwargs):
1439 from .. import _IN_CLI
1440 if _IN_CLI:
1441 if msg in deprecation_warning._cache:
1442 return
1443 deprecation_warning._cache.add(msg)
1444 if printer:
1445 return printer(f'{msg}{bug_reports_message()}', **kwargs)
1446 return write_string(f'ERROR: {msg}{bug_reports_message()}\n', **kwargs)
1447 else:
1448 import warnings
1449 warnings.warn(DeprecationWarning(msg), stacklevel=stacklevel + 3)
1450
1451
1452 deprecation_warning._cache = set()
1453
1454
1455 def bytes_to_intlist(bs):
1456 if not bs:
1457 return []
1458 if isinstance(bs[0], int): # Python 3
1459 return list(bs)
1460 else:
1461 return [ord(c) for c in bs]
1462
1463
1464 def intlist_to_bytes(xs):
1465 if not xs:
1466 return b''
1467 return struct.pack('%dB' % len(xs), *xs)
1468
1469
1470 class LockingUnsupportedError(OSError):
1471 msg = 'File locking is not supported'
1472
1473 def __init__(self):
1474 super().__init__(self.msg)
1475
1476
1477 # Cross-platform file locking
1478 if sys.platform == 'win32':
1479 import ctypes
1480 import ctypes.wintypes
1481 import msvcrt
1482
1483 class OVERLAPPED(ctypes.Structure):
1484 _fields_ = [
1485 ('Internal', ctypes.wintypes.LPVOID),
1486 ('InternalHigh', ctypes.wintypes.LPVOID),
1487 ('Offset', ctypes.wintypes.DWORD),
1488 ('OffsetHigh', ctypes.wintypes.DWORD),
1489 ('hEvent', ctypes.wintypes.HANDLE),
1490 ]
1491
1492 kernel32 = ctypes.WinDLL('kernel32')
1493 LockFileEx = kernel32.LockFileEx
1494 LockFileEx.argtypes = [
1495 ctypes.wintypes.HANDLE, # hFile
1496 ctypes.wintypes.DWORD, # dwFlags
1497 ctypes.wintypes.DWORD, # dwReserved
1498 ctypes.wintypes.DWORD, # nNumberOfBytesToLockLow
1499 ctypes.wintypes.DWORD, # nNumberOfBytesToLockHigh
1500 ctypes.POINTER(OVERLAPPED) # Overlapped
1501 ]
1502 LockFileEx.restype = ctypes.wintypes.BOOL
1503 UnlockFileEx = kernel32.UnlockFileEx
1504 UnlockFileEx.argtypes = [
1505 ctypes.wintypes.HANDLE, # hFile
1506 ctypes.wintypes.DWORD, # dwReserved
1507 ctypes.wintypes.DWORD, # nNumberOfBytesToLockLow
1508 ctypes.wintypes.DWORD, # nNumberOfBytesToLockHigh
1509 ctypes.POINTER(OVERLAPPED) # Overlapped
1510 ]
1511 UnlockFileEx.restype = ctypes.wintypes.BOOL
1512 whole_low = 0xffffffff
1513 whole_high = 0x7fffffff
1514
1515 def _lock_file(f, exclusive, block):
1516 overlapped = OVERLAPPED()
1517 overlapped.Offset = 0
1518 overlapped.OffsetHigh = 0
1519 overlapped.hEvent = 0
1520 f._lock_file_overlapped_p = ctypes.pointer(overlapped)
1521
1522 if not LockFileEx(msvcrt.get_osfhandle(f.fileno()),
1523 (0x2 if exclusive else 0x0) | (0x0 if block else 0x1),
1524 0, whole_low, whole_high, f._lock_file_overlapped_p):
1525 # NB: No argument form of "ctypes.FormatError" does not work on PyPy
1526 raise BlockingIOError(f'Locking file failed: {ctypes.FormatError(ctypes.GetLastError())!r}')
1527
1528 def _unlock_file(f):
1529 assert f._lock_file_overlapped_p
1530 handle = msvcrt.get_osfhandle(f.fileno())
1531 if not UnlockFileEx(handle, 0, whole_low, whole_high, f._lock_file_overlapped_p):
1532 raise OSError('Unlocking file failed: %r' % ctypes.FormatError())
1533
1534 else:
1535 try:
1536 import fcntl
1537
1538 def _lock_file(f, exclusive, block):
1539 flags = fcntl.LOCK_EX if exclusive else fcntl.LOCK_SH
1540 if not block:
1541 flags |= fcntl.LOCK_NB
1542 try:
1543 fcntl.flock(f, flags)
1544 except BlockingIOError:
1545 raise
1546 except OSError: # AOSP does not have flock()
1547 fcntl.lockf(f, flags)
1548
1549 def _unlock_file(f):
1550 with contextlib.suppress(OSError):
1551 return fcntl.flock(f, fcntl.LOCK_UN)
1552 with contextlib.suppress(OSError):
1553 return fcntl.lockf(f, fcntl.LOCK_UN) # AOSP does not have flock()
1554 return fcntl.flock(f, fcntl.LOCK_UN | fcntl.LOCK_NB) # virtiofs needs LOCK_NB on unlocking
1555
1556 except ImportError:
1557
1558 def _lock_file(f, exclusive, block):
1559 raise LockingUnsupportedError()
1560
1561 def _unlock_file(f):
1562 raise LockingUnsupportedError()
1563
1564
1565 class locked_file:
1566 locked = False
1567
1568 def __init__(self, filename, mode, block=True, encoding=None):
1569 if mode not in {'r', 'rb', 'a', 'ab', 'w', 'wb'}:
1570 raise NotImplementedError(mode)
1571 self.mode, self.block = mode, block
1572
1573 writable = any(f in mode for f in 'wax+')
1574 readable = any(f in mode for f in 'r+')
1575 flags = functools.reduce(operator.ior, (
1576 getattr(os, 'O_CLOEXEC', 0), # UNIX only
1577 getattr(os, 'O_BINARY', 0), # Windows only
1578 getattr(os, 'O_NOINHERIT', 0), # Windows only
1579 os.O_CREAT if writable else 0, # O_TRUNC only after locking
1580 os.O_APPEND if 'a' in mode else 0,
1581 os.O_EXCL if 'x' in mode else 0,
1582 os.O_RDONLY if not writable else os.O_RDWR if readable else os.O_WRONLY,
1583 ))
1584
1585 self.f = os.fdopen(os.open(filename, flags, 0o666), mode, encoding=encoding)
1586
1587 def __enter__(self):
1588 exclusive = 'r' not in self.mode
1589 try:
1590 _lock_file(self.f, exclusive, self.block)
1591 self.locked = True
1592 except OSError:
1593 self.f.close()
1594 raise
1595 if 'w' in self.mode:
1596 try:
1597 self.f.truncate()
1598 except OSError as e:
1599 if e.errno not in (
1600 errno.ESPIPE, # Illegal seek - expected for FIFO
1601 errno.EINVAL, # Invalid argument - expected for /dev/null
1602 ):
1603 raise
1604 return self
1605
1606 def unlock(self):
1607 if not self.locked:
1608 return
1609 try:
1610 _unlock_file(self.f)
1611 finally:
1612 self.locked = False
1613
1614 def __exit__(self, *_):
1615 try:
1616 self.unlock()
1617 finally:
1618 self.f.close()
1619
1620 open = __enter__
1621 close = __exit__
1622
1623 def __getattr__(self, attr):
1624 return getattr(self.f, attr)
1625
1626 def __iter__(self):
1627 return iter(self.f)
1628
1629
1630 @functools.cache
1631 def get_filesystem_encoding():
1632 encoding = sys.getfilesystemencoding()
1633 return encoding if encoding is not None else 'utf-8'
1634
1635
1636 def shell_quote(args):
1637 quoted_args = []
1638 encoding = get_filesystem_encoding()
1639 for a in args:
1640 if isinstance(a, bytes):
1641 # We may get a filename encoded with 'encodeFilename'
1642 a = a.decode(encoding)
1643 quoted_args.append(compat_shlex_quote(a))
1644 return ' '.join(quoted_args)
1645
1646
1647 def smuggle_url(url, data):
1648 """ Pass additional data in a URL for internal use. """
1649
1650 url, idata = unsmuggle_url(url, {})
1651 data.update(idata)
1652 sdata = urllib.parse.urlencode(
1653 {'__youtubedl_smuggle': json.dumps(data)})
1654 return url + '#' + sdata
1655
1656
1657 def unsmuggle_url(smug_url, default=None):
1658 if '#__youtubedl_smuggle' not in smug_url:
1659 return smug_url, default
1660 url, _, sdata = smug_url.rpartition('#')
1661 jsond = urllib.parse.parse_qs(sdata)['__youtubedl_smuggle'][0]
1662 data = json.loads(jsond)
1663 return url, data
1664
1665
1666 def format_decimal_suffix(num, fmt='%d%s', *, factor=1000):
1667 """ Formats numbers with decimal sufixes like K, M, etc """
1668 num, factor = float_or_none(num), float(factor)
1669 if num is None or num < 0:
1670 return None
1671 POSSIBLE_SUFFIXES = 'kMGTPEZY'
1672 exponent = 0 if num == 0 else min(int(math.log(num, factor)), len(POSSIBLE_SUFFIXES))
1673 suffix = ['', *POSSIBLE_SUFFIXES][exponent]
1674 if factor == 1024:
1675 suffix = {'k': 'Ki', '': ''}.get(suffix, f'{suffix}i')
1676 converted = num / (factor ** exponent)
1677 return fmt % (converted, suffix)
1678
1679
1680 def format_bytes(bytes):
1681 return format_decimal_suffix(bytes, '%.2f%sB', factor=1024) or 'N/A'
1682
1683
1684 def lookup_unit_table(unit_table, s, strict=False):
1685 num_re = NUMBER_RE if strict else NUMBER_RE.replace(R'\.', '[,.]')
1686 units_re = '|'.join(re.escape(u) for u in unit_table)
1687 m = (re.fullmatch if strict else re.match)(
1688 rf'(?P<num>{num_re})\s*(?P<unit>{units_re})\b', s)
1689 if not m:
1690 return None
1691
1692 num = float(m.group('num').replace(',', '.'))
1693 mult = unit_table[m.group('unit')]
1694 return round(num * mult)
1695
1696
1697 def parse_bytes(s):
1698 """Parse a string indicating a byte quantity into an integer"""
1699 return lookup_unit_table(
1700 {u: 1024**i for i, u in enumerate(['', *'KMGTPEZY'])},
1701 s.upper(), strict=True)
1702
1703
1704 def parse_filesize(s):
1705 if s is None:
1706 return None
1707
1708 # The lower-case forms are of course incorrect and unofficial,
1709 # but we support those too
1710 _UNIT_TABLE = {
1711 'B': 1,
1712 'b': 1,
1713 'bytes': 1,
1714 'KiB': 1024,
1715 'KB': 1000,
1716 'kB': 1024,
1717 'Kb': 1000,
1718 'kb': 1000,
1719 'kilobytes': 1000,
1720 'kibibytes': 1024,
1721 'MiB': 1024 ** 2,
1722 'MB': 1000 ** 2,
1723 'mB': 1024 ** 2,
1724 'Mb': 1000 ** 2,
1725 'mb': 1000 ** 2,
1726 'megabytes': 1000 ** 2,
1727 'mebibytes': 1024 ** 2,
1728 'GiB': 1024 ** 3,
1729 'GB': 1000 ** 3,
1730 'gB': 1024 ** 3,
1731 'Gb': 1000 ** 3,
1732 'gb': 1000 ** 3,
1733 'gigabytes': 1000 ** 3,
1734 'gibibytes': 1024 ** 3,
1735 'TiB': 1024 ** 4,
1736 'TB': 1000 ** 4,
1737 'tB': 1024 ** 4,
1738 'Tb': 1000 ** 4,
1739 'tb': 1000 ** 4,
1740 'terabytes': 1000 ** 4,
1741 'tebibytes': 1024 ** 4,
1742 'PiB': 1024 ** 5,
1743 'PB': 1000 ** 5,
1744 'pB': 1024 ** 5,
1745 'Pb': 1000 ** 5,
1746 'pb': 1000 ** 5,
1747 'petabytes': 1000 ** 5,
1748 'pebibytes': 1024 ** 5,
1749 'EiB': 1024 ** 6,
1750 'EB': 1000 ** 6,
1751 'eB': 1024 ** 6,
1752 'Eb': 1000 ** 6,
1753 'eb': 1000 ** 6,
1754 'exabytes': 1000 ** 6,
1755 'exbibytes': 1024 ** 6,
1756 'ZiB': 1024 ** 7,
1757 'ZB': 1000 ** 7,
1758 'zB': 1024 ** 7,
1759 'Zb': 1000 ** 7,
1760 'zb': 1000 ** 7,
1761 'zettabytes': 1000 ** 7,
1762 'zebibytes': 1024 ** 7,
1763 'YiB': 1024 ** 8,
1764 'YB': 1000 ** 8,
1765 'yB': 1024 ** 8,
1766 'Yb': 1000 ** 8,
1767 'yb': 1000 ** 8,
1768 'yottabytes': 1000 ** 8,
1769 'yobibytes': 1024 ** 8,
1770 }
1771
1772 return lookup_unit_table(_UNIT_TABLE, s)
1773
1774
1775 def parse_count(s):
1776 if s is None:
1777 return None
1778
1779 s = re.sub(r'^[^\d]+\s', '', s).strip()
1780
1781 if re.match(r'^[\d,.]+$', s):
1782 return str_to_int(s)
1783
1784 _UNIT_TABLE = {
1785 'k': 1000,
1786 'K': 1000,
1787 'm': 1000 ** 2,
1788 'M': 1000 ** 2,
1789 'kk': 1000 ** 2,
1790 'KK': 1000 ** 2,
1791 'b': 1000 ** 3,
1792 'B': 1000 ** 3,
1793 }
1794
1795 ret = lookup_unit_table(_UNIT_TABLE, s)
1796 if ret is not None:
1797 return ret
1798
1799 mobj = re.match(r'([\d,.]+)(?:$|\s)', s)
1800 if mobj:
1801 return str_to_int(mobj.group(1))
1802
1803
1804 def parse_resolution(s, *, lenient=False):
1805 if s is None:
1806 return {}
1807
1808 if lenient:
1809 mobj = re.search(r'(?P<w>\d+)\s*[xX×,]\s*(?P<h>\d+)', s)
1810 else:
1811 mobj = re.search(r'(?<![a-zA-Z0-9])(?P<w>\d+)\s*[xX×,]\s*(?P<h>\d+)(?![a-zA-Z0-9])', s)
1812 if mobj:
1813 return {
1814 'width': int(mobj.group('w')),
1815 'height': int(mobj.group('h')),
1816 }
1817
1818 mobj = re.search(r'(?<![a-zA-Z0-9])(\d+)[pPiI](?![a-zA-Z0-9])', s)
1819 if mobj:
1820 return {'height': int(mobj.group(1))}
1821
1822 mobj = re.search(r'\b([48])[kK]\b', s)
1823 if mobj:
1824 return {'height': int(mobj.group(1)) * 540}
1825
1826 return {}
1827
1828
1829 def parse_bitrate(s):
1830 if not isinstance(s, str):
1831 return
1832 mobj = re.search(r'\b(\d+)\s*kbps', s)
1833 if mobj:
1834 return int(mobj.group(1))
1835
1836
1837 def month_by_name(name, lang='en'):
1838 """ Return the number of a month by (locale-independently) English name """
1839
1840 month_names = MONTH_NAMES.get(lang, MONTH_NAMES['en'])
1841
1842 try:
1843 return month_names.index(name) + 1
1844 except ValueError:
1845 return None
1846
1847
1848 def month_by_abbreviation(abbrev):
1849 """ Return the number of a month by (locale-independently) English
1850 abbreviations """
1851
1852 try:
1853 return [s[:3] for s in ENGLISH_MONTH_NAMES].index(abbrev) + 1
1854 except ValueError:
1855 return None
1856
1857
1858 def fix_xml_ampersands(xml_str):
1859 """Replace all the '&' by '&amp;' in XML"""
1860 return re.sub(
1861 r'&(?!amp;|lt;|gt;|apos;|quot;|#x[0-9a-fA-F]{,4};|#[0-9]{,4};)',
1862 '&amp;',
1863 xml_str)
1864
1865
1866 def setproctitle(title):
1867 assert isinstance(title, str)
1868
1869 # Workaround for https://github.com/yt-dlp/yt-dlp/issues/4541
1870 try:
1871 import ctypes
1872 except ImportError:
1873 return
1874
1875 try:
1876 libc = ctypes.cdll.LoadLibrary('libc.so.6')
1877 except OSError:
1878 return
1879 except TypeError:
1880 # LoadLibrary in Windows Python 2.7.13 only expects
1881 # a bytestring, but since unicode_literals turns
1882 # every string into a unicode string, it fails.
1883 return
1884 title_bytes = title.encode()
1885 buf = ctypes.create_string_buffer(len(title_bytes))
1886 buf.value = title_bytes
1887 try:
1888 libc.prctl(15, buf, 0, 0, 0)
1889 except AttributeError:
1890 return # Strange libc, just skip this
1891
1892
1893 def remove_start(s, start):
1894 return s[len(start):] if s is not None and s.startswith(start) else s
1895
1896
1897 def remove_end(s, end):
1898 return s[:-len(end)] if s is not None and s.endswith(end) else s
1899
1900
1901 def remove_quotes(s):
1902 if s is None or len(s) < 2:
1903 return s
1904 for quote in ('"', "'", ):
1905 if s[0] == quote and s[-1] == quote:
1906 return s[1:-1]
1907 return s
1908
1909
1910 def get_domain(url):
1911 """
1912 This implementation is inconsistent, but is kept for compatibility.
1913 Use this only for "webpage_url_domain"
1914 """
1915 return remove_start(urllib.parse.urlparse(url).netloc, 'www.') or None
1916
1917
1918 def url_basename(url):
1919 path = urllib.parse.urlparse(url).path
1920 return path.strip('/').split('/')[-1]
1921
1922
1923 def base_url(url):
1924 return re.match(r'https?://[^?#]+/', url).group()
1925
1926
1927 def urljoin(base, path):
1928 if isinstance(path, bytes):
1929 path = path.decode()
1930 if not isinstance(path, str) or not path:
1931 return None
1932 if re.match(r'^(?:[a-zA-Z][a-zA-Z0-9+-.]*:)?//', path):
1933 return path
1934 if isinstance(base, bytes):
1935 base = base.decode()
1936 if not isinstance(base, str) or not re.match(
1937 r'^(?:https?:)?//', base):
1938 return None
1939 return urllib.parse.urljoin(base, path)
1940
1941
1942 def int_or_none(v, scale=1, default=None, get_attr=None, invscale=1):
1943 if get_attr and v is not None:
1944 v = getattr(v, get_attr, None)
1945 try:
1946 return int(v) * invscale // scale
1947 except (ValueError, TypeError, OverflowError):
1948 return default
1949
1950
1951 def str_or_none(v, default=None):
1952 return default if v is None else str(v)
1953
1954
1955 def str_to_int(int_str):
1956 """ A more relaxed version of int_or_none """
1957 if isinstance(int_str, int):
1958 return int_str
1959 elif isinstance(int_str, str):
1960 int_str = re.sub(r'[,\.\+]', '', int_str)
1961 return int_or_none(int_str)
1962
1963
1964 def float_or_none(v, scale=1, invscale=1, default=None):
1965 if v is None:
1966 return default
1967 try:
1968 return float(v) * invscale / scale
1969 except (ValueError, TypeError):
1970 return default
1971
1972
1973 def bool_or_none(v, default=None):
1974 return v if isinstance(v, bool) else default
1975
1976
1977 def strip_or_none(v, default=None):
1978 return v.strip() if isinstance(v, str) else default
1979
1980
1981 def url_or_none(url):
1982 if not url or not isinstance(url, str):
1983 return None
1984 url = url.strip()
1985 return url if re.match(r'^(?:(?:https?|rt(?:m(?:pt?[es]?|fp)|sp[su]?)|mms|ftps?):)?//', url) else None
1986
1987
1988 def strftime_or_none(timestamp, date_format='%Y%m%d', default=None):
1989 datetime_object = None
1990 try:
1991 if isinstance(timestamp, (int, float)): # unix timestamp
1992 # Using naive datetime here can break timestamp() in Windows
1993 # Ref: https://github.com/yt-dlp/yt-dlp/issues/5185, https://github.com/python/cpython/issues/94414
1994 # Also, datetime.datetime.fromtimestamp breaks for negative timestamps
1995 # Ref: https://github.com/yt-dlp/yt-dlp/issues/6706#issuecomment-1496842642
1996 datetime_object = (datetime.datetime.fromtimestamp(0, datetime.timezone.utc)
1997 + datetime.timedelta(seconds=timestamp))
1998 elif isinstance(timestamp, str): # assume YYYYMMDD
1999 datetime_object = datetime.datetime.strptime(timestamp, '%Y%m%d')
2000 date_format = re.sub( # Support %s on windows
2001 r'(?<!%)(%%)*%s', rf'\g<1>{int(datetime_object.timestamp())}', date_format)
2002 return datetime_object.strftime(date_format)
2003 except (ValueError, TypeError, AttributeError):
2004 return default
2005
2006
2007 def parse_duration(s):
2008 if not isinstance(s, str):
2009 return None
2010 s = s.strip()
2011 if not s:
2012 return None
2013
2014 days, hours, mins, secs, ms = [None] * 5
2015 m = re.match(r'''(?x)
2016 (?P<before_secs>
2017 (?:(?:(?P<days>[0-9]+):)?(?P<hours>[0-9]+):)?(?P<mins>[0-9]+):)?
2018 (?P<secs>(?(before_secs)[0-9]{1,2}|[0-9]+))
2019 (?P<ms>[.:][0-9]+)?Z?$
2020 ''', s)
2021 if m:
2022 days, hours, mins, secs, ms = m.group('days', 'hours', 'mins', 'secs', 'ms')
2023 else:
2024 m = re.match(
2025 r'''(?ix)(?:P?
2026 (?:
2027 [0-9]+\s*y(?:ears?)?,?\s*
2028 )?
2029 (?:
2030 [0-9]+\s*m(?:onths?)?,?\s*
2031 )?
2032 (?:
2033 [0-9]+\s*w(?:eeks?)?,?\s*
2034 )?
2035 (?:
2036 (?P<days>[0-9]+)\s*d(?:ays?)?,?\s*
2037 )?
2038 T)?
2039 (?:
2040 (?P<hours>[0-9]+)\s*h(?:(?:ou)?rs?)?,?\s*
2041 )?
2042 (?:
2043 (?P<mins>[0-9]+)\s*m(?:in(?:ute)?s?)?,?\s*
2044 )?
2045 (?:
2046 (?P<secs>[0-9]+)(?P<ms>\.[0-9]+)?\s*s(?:ec(?:ond)?s?)?\s*
2047 )?Z?$''', s)
2048 if m:
2049 days, hours, mins, secs, ms = m.groups()
2050 else:
2051 m = re.match(r'(?i)(?:(?P<hours>[0-9.]+)\s*(?:hours?)|(?P<mins>[0-9.]+)\s*(?:mins?\.?|minutes?)\s*)Z?$', s)
2052 if m:
2053 hours, mins = m.groups()
2054 else:
2055 return None
2056
2057 if ms:
2058 ms = ms.replace(':', '.')
2059 return sum(float(part or 0) * mult for part, mult in (
2060 (days, 86400), (hours, 3600), (mins, 60), (secs, 1), (ms, 1)))
2061
2062
2063 def prepend_extension(filename, ext, expected_real_ext=None):
2064 name, real_ext = os.path.splitext(filename)
2065 return (
2066 f'{name}.{ext}{real_ext}'
2067 if not expected_real_ext or real_ext[1:] == expected_real_ext
2068 else f'{filename}.{ext}')
2069
2070
2071 def replace_extension(filename, ext, expected_real_ext=None):
2072 name, real_ext = os.path.splitext(filename)
2073 return '{}.{}'.format(
2074 name if not expected_real_ext or real_ext[1:] == expected_real_ext else filename,
2075 ext)
2076
2077
2078 def check_executable(exe, args=[]):
2079 """ Checks if the given binary is installed somewhere in PATH, and returns its name.
2080 args can be a list of arguments for a short output (like -version) """
2081 try:
2082 Popen.run([exe] + args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
2083 except OSError:
2084 return False
2085 return exe
2086
2087
2088 def _get_exe_version_output(exe, args):
2089 try:
2090 # STDIN should be redirected too. On UNIX-like systems, ffmpeg triggers
2091 # SIGTTOU if yt-dlp is run in the background.
2092 # See https://github.com/ytdl-org/youtube-dl/issues/955#issuecomment-209789656
2093 stdout, _, ret = Popen.run([encodeArgument(exe)] + args, text=True,
2094 stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
2095 if ret:
2096 return None
2097 except OSError:
2098 return False
2099 return stdout
2100
2101
2102 def detect_exe_version(output, version_re=None, unrecognized='present'):
2103 assert isinstance(output, str)
2104 if version_re is None:
2105 version_re = r'version\s+([-0-9._a-zA-Z]+)'
2106 m = re.search(version_re, output)
2107 if m:
2108 return m.group(1)
2109 else:
2110 return unrecognized
2111
2112
2113 def get_exe_version(exe, args=['--version'],
2114 version_re=None, unrecognized=('present', 'broken')):
2115 """ Returns the version of the specified executable,
2116 or False if the executable is not present """
2117 unrecognized = variadic(unrecognized)
2118 assert len(unrecognized) in (1, 2)
2119 out = _get_exe_version_output(exe, args)
2120 if out is None:
2121 return unrecognized[-1]
2122 return out and detect_exe_version(out, version_re, unrecognized[0])
2123
2124
2125 def frange(start=0, stop=None, step=1):
2126 """Float range"""
2127 if stop is None:
2128 start, stop = 0, start
2129 sign = [-1, 1][step > 0] if step else 0
2130 while sign * start < sign * stop:
2131 yield start
2132 start += step
2133
2134
2135 class LazyList(collections.abc.Sequence):
2136 """Lazy immutable list from an iterable
2137 Note that slices of a LazyList are lists and not LazyList"""
2138
2139 class IndexError(IndexError):
2140 pass
2141
2142 def __init__(self, iterable, *, reverse=False, _cache=None):
2143 self._iterable = iter(iterable)
2144 self._cache = [] if _cache is None else _cache
2145 self._reversed = reverse
2146
2147 def __iter__(self):
2148 if self._reversed:
2149 # We need to consume the entire iterable to iterate in reverse
2150 yield from self.exhaust()
2151 return
2152 yield from self._cache
2153 for item in self._iterable:
2154 self._cache.append(item)
2155 yield item
2156
2157 def _exhaust(self):
2158 self._cache.extend(self._iterable)
2159 self._iterable = [] # Discard the emptied iterable to make it pickle-able
2160 return self._cache
2161
2162 def exhaust(self):
2163 """Evaluate the entire iterable"""
2164 return self._exhaust()[::-1 if self._reversed else 1]
2165
2166 @staticmethod
2167 def _reverse_index(x):
2168 return None if x is None else ~x
2169
2170 def __getitem__(self, idx):
2171 if isinstance(idx, slice):
2172 if self._reversed:
2173 idx = slice(self._reverse_index(idx.start), self._reverse_index(idx.stop), -(idx.step or 1))
2174 start, stop, step = idx.start, idx.stop, idx.step or 1
2175 elif isinstance(idx, int):
2176 if self._reversed:
2177 idx = self._reverse_index(idx)
2178 start, stop, step = idx, idx, 0
2179 else:
2180 raise TypeError('indices must be integers or slices')
2181 if ((start or 0) < 0 or (stop or 0) < 0
2182 or (start is None and step < 0)
2183 or (stop is None and step > 0)):
2184 # We need to consume the entire iterable to be able to slice from the end
2185 # Obviously, never use this with infinite iterables
2186 self._exhaust()
2187 try:
2188 return self._cache[idx]
2189 except IndexError as e:
2190 raise self.IndexError(e) from e
2191 n = max(start or 0, stop or 0) - len(self._cache) + 1
2192 if n > 0:
2193 self._cache.extend(itertools.islice(self._iterable, n))
2194 try:
2195 return self._cache[idx]
2196 except IndexError as e:
2197 raise self.IndexError(e) from e
2198
2199 def __bool__(self):
2200 try:
2201 self[-1] if self._reversed else self[0]
2202 except self.IndexError:
2203 return False
2204 return True
2205
2206 def __len__(self):
2207 self._exhaust()
2208 return len(self._cache)
2209
2210 def __reversed__(self):
2211 return type(self)(self._iterable, reverse=not self._reversed, _cache=self._cache)
2212
2213 def __copy__(self):
2214 return type(self)(self._iterable, reverse=self._reversed, _cache=self._cache)
2215
2216 def __repr__(self):
2217 # repr and str should mimic a list. So we exhaust the iterable
2218 return repr(self.exhaust())
2219
2220 def __str__(self):
2221 return repr(self.exhaust())
2222
2223
2224 class PagedList:
2225
2226 class IndexError(IndexError):
2227 pass
2228
2229 def __len__(self):
2230 # This is only useful for tests
2231 return len(self.getslice())
2232
2233 def __init__(self, pagefunc, pagesize, use_cache=True):
2234 self._pagefunc = pagefunc
2235 self._pagesize = pagesize
2236 self._pagecount = float('inf')
2237 self._use_cache = use_cache
2238 self._cache = {}
2239
2240 def getpage(self, pagenum):
2241 page_results = self._cache.get(pagenum)
2242 if page_results is None:
2243 page_results = [] if pagenum > self._pagecount else list(self._pagefunc(pagenum))
2244 if self._use_cache:
2245 self._cache[pagenum] = page_results
2246 return page_results
2247
2248 def getslice(self, start=0, end=None):
2249 return list(self._getslice(start, end))
2250
2251 def _getslice(self, start, end):
2252 raise NotImplementedError('This method must be implemented by subclasses')
2253
2254 def __getitem__(self, idx):
2255 assert self._use_cache, 'Indexing PagedList requires cache'
2256 if not isinstance(idx, int) or idx < 0:
2257 raise TypeError('indices must be non-negative integers')
2258 entries = self.getslice(idx, idx + 1)
2259 if not entries:
2260 raise self.IndexError()
2261 return entries[0]
2262
2263
2264 class OnDemandPagedList(PagedList):
2265 """Download pages until a page with less than maximum results"""
2266
2267 def _getslice(self, start, end):
2268 for pagenum in itertools.count(start // self._pagesize):
2269 firstid = pagenum * self._pagesize
2270 nextfirstid = pagenum * self._pagesize + self._pagesize
2271 if start >= nextfirstid:
2272 continue
2273
2274 startv = (
2275 start % self._pagesize
2276 if firstid <= start < nextfirstid
2277 else 0)
2278 endv = (
2279 ((end - 1) % self._pagesize) + 1
2280 if (end is not None and firstid <= end <= nextfirstid)
2281 else None)
2282
2283 try:
2284 page_results = self.getpage(pagenum)
2285 except Exception:
2286 self._pagecount = pagenum - 1
2287 raise
2288 if startv != 0 or endv is not None:
2289 page_results = page_results[startv:endv]
2290 yield from page_results
2291
2292 # A little optimization - if current page is not "full", ie. does
2293 # not contain page_size videos then we can assume that this page
2294 # is the last one - there are no more ids on further pages -
2295 # i.e. no need to query again.
2296 if len(page_results) + startv < self._pagesize:
2297 break
2298
2299 # If we got the whole page, but the next page is not interesting,
2300 # break out early as well
2301 if end == nextfirstid:
2302 break
2303
2304
2305 class InAdvancePagedList(PagedList):
2306 """PagedList with total number of pages known in advance"""
2307
2308 def __init__(self, pagefunc, pagecount, pagesize):
2309 PagedList.__init__(self, pagefunc, pagesize, True)
2310 self._pagecount = pagecount
2311
2312 def _getslice(self, start, end):
2313 start_page = start // self._pagesize
2314 end_page = self._pagecount if end is None else min(self._pagecount, end // self._pagesize + 1)
2315 skip_elems = start - start_page * self._pagesize
2316 only_more = None if end is None else end - start
2317 for pagenum in range(start_page, end_page):
2318 page_results = self.getpage(pagenum)
2319 if skip_elems:
2320 page_results = page_results[skip_elems:]
2321 skip_elems = None
2322 if only_more is not None:
2323 if len(page_results) < only_more:
2324 only_more -= len(page_results)
2325 else:
2326 yield from page_results[:only_more]
2327 break
2328 yield from page_results
2329
2330
2331 class PlaylistEntries:
2332 MissingEntry = object()
2333 is_exhausted = False
2334
2335 def __init__(self, ydl, info_dict):
2336 self.ydl = ydl
2337
2338 # _entries must be assigned now since infodict can change during iteration
2339 entries = info_dict.get('entries')
2340 if entries is None:
2341 raise EntryNotInPlaylist('There are no entries')
2342 elif isinstance(entries, list):
2343 self.is_exhausted = True
2344
2345 requested_entries = info_dict.get('requested_entries')
2346 self.is_incomplete = requested_entries is not None
2347 if self.is_incomplete:
2348 assert self.is_exhausted
2349 self._entries = [self.MissingEntry] * max(requested_entries or [0])
2350 for i, entry in zip(requested_entries, entries):
2351 self._entries[i - 1] = entry
2352 elif isinstance(entries, (list, PagedList, LazyList)):
2353 self._entries = entries
2354 else:
2355 self._entries = LazyList(entries)
2356
2357 PLAYLIST_ITEMS_RE = re.compile(r'''(?x)
2358 (?P<start>[+-]?\d+)?
2359 (?P<range>[:-]
2360 (?P<end>[+-]?\d+|inf(?:inite)?)?
2361 (?::(?P<step>[+-]?\d+))?
2362 )?''')
2363
2364 @classmethod
2365 def parse_playlist_items(cls, string):
2366 for segment in string.split(','):
2367 if not segment:
2368 raise ValueError('There is two or more consecutive commas')
2369 mobj = cls.PLAYLIST_ITEMS_RE.fullmatch(segment)
2370 if not mobj:
2371 raise ValueError(f'{segment!r} is not a valid specification')
2372 start, end, step, has_range = mobj.group('start', 'end', 'step', 'range')
2373 if int_or_none(step) == 0:
2374 raise ValueError(f'Step in {segment!r} cannot be zero')
2375 yield slice(int_or_none(start), float_or_none(end), int_or_none(step)) if has_range else int(start)
2376
2377 def get_requested_items(self):
2378 playlist_items = self.ydl.params.get('playlist_items')
2379 playlist_start = self.ydl.params.get('playliststart', 1)
2380 playlist_end = self.ydl.params.get('playlistend')
2381 # For backwards compatibility, interpret -1 as whole list
2382 if playlist_end in (-1, None):
2383 playlist_end = ''
2384 if not playlist_items:
2385 playlist_items = f'{playlist_start}:{playlist_end}'
2386 elif playlist_start != 1 or playlist_end:
2387 self.ydl.report_warning('Ignoring playliststart and playlistend because playlistitems was given', only_once=True)
2388
2389 for index in self.parse_playlist_items(playlist_items):
2390 for i, entry in self[index]:
2391 yield i, entry
2392 if not entry:
2393 continue
2394 try:
2395 # The item may have just been added to archive. Don't break due to it
2396 if not self.ydl.params.get('lazy_playlist'):
2397 # TODO: Add auto-generated fields
2398 self.ydl._match_entry(entry, incomplete=True, silent=True)
2399 except (ExistingVideoReached, RejectedVideoReached):
2400 return
2401
2402 def get_full_count(self):
2403 if self.is_exhausted and not self.is_incomplete:
2404 return len(self)
2405 elif isinstance(self._entries, InAdvancePagedList):
2406 if self._entries._pagesize == 1:
2407 return self._entries._pagecount
2408
2409 @functools.cached_property
2410 def _getter(self):
2411 if isinstance(self._entries, list):
2412 def get_entry(i):
2413 try:
2414 entry = self._entries[i]
2415 except IndexError:
2416 entry = self.MissingEntry
2417 if not self.is_incomplete:
2418 raise self.IndexError()
2419 if entry is self.MissingEntry:
2420 raise EntryNotInPlaylist(f'Entry {i + 1} cannot be found')
2421 return entry
2422 else:
2423 def get_entry(i):
2424 try:
2425 return type(self.ydl)._handle_extraction_exceptions(lambda _, i: self._entries[i])(self.ydl, i)
2426 except (LazyList.IndexError, PagedList.IndexError):
2427 raise self.IndexError()
2428 return get_entry
2429
2430 def __getitem__(self, idx):
2431 if isinstance(idx, int):
2432 idx = slice(idx, idx)
2433
2434 # NB: PlaylistEntries[1:10] => (0, 1, ... 9)
2435 step = 1 if idx.step is None else idx.step
2436 if idx.start is None:
2437 start = 0 if step > 0 else len(self) - 1
2438 else:
2439 start = idx.start - 1 if idx.start >= 0 else len(self) + idx.start
2440
2441 # NB: Do not call len(self) when idx == [:]
2442 if idx.stop is None:
2443 stop = 0 if step < 0 else float('inf')
2444 else:
2445 stop = idx.stop - 1 if idx.stop >= 0 else len(self) + idx.stop
2446 stop += [-1, 1][step > 0]
2447
2448 for i in frange(start, stop, step):
2449 if i < 0:
2450 continue
2451 try:
2452 entry = self._getter(i)
2453 except self.IndexError:
2454 self.is_exhausted = True
2455 if step > 0:
2456 break
2457 continue
2458 yield i + 1, entry
2459
2460 def __len__(self):
2461 return len(tuple(self[:]))
2462
2463 class IndexError(IndexError):
2464 pass
2465
2466
2467 def uppercase_escape(s):
2468 unicode_escape = codecs.getdecoder('unicode_escape')
2469 return re.sub(
2470 r'\\U[0-9a-fA-F]{8}',
2471 lambda m: unicode_escape(m.group(0))[0],
2472 s)
2473
2474
2475 def lowercase_escape(s):
2476 unicode_escape = codecs.getdecoder('unicode_escape')
2477 return re.sub(
2478 r'\\u[0-9a-fA-F]{4}',
2479 lambda m: unicode_escape(m.group(0))[0],
2480 s)
2481
2482
2483 def parse_qs(url, **kwargs):
2484 return urllib.parse.parse_qs(urllib.parse.urlparse(url).query, **kwargs)
2485
2486
2487 def read_batch_urls(batch_fd):
2488 def fixup(url):
2489 if not isinstance(url, str):
2490 url = url.decode('utf-8', 'replace')
2491 BOM_UTF8 = ('\xef\xbb\xbf', '\ufeff')
2492 for bom in BOM_UTF8:
2493 if url.startswith(bom):
2494 url = url[len(bom):]
2495 url = url.lstrip()
2496 if not url or url.startswith(('#', ';', ']')):
2497 return False
2498 # "#" cannot be stripped out since it is part of the URI
2499 # However, it can be safely stripped out if following a whitespace
2500 return re.split(r'\s#', url, 1)[0].rstrip()
2501
2502 with contextlib.closing(batch_fd) as fd:
2503 return [url for url in map(fixup, fd) if url]
2504
2505
2506 def urlencode_postdata(*args, **kargs):
2507 return urllib.parse.urlencode(*args, **kargs).encode('ascii')
2508
2509
2510 def update_url(url, *, query_update=None, **kwargs):
2511 """Replace URL components specified by kwargs
2512 @param url str or parse url tuple
2513 @param query_update update query
2514 @returns str
2515 """
2516 if isinstance(url, str):
2517 if not kwargs and not query_update:
2518 return url
2519 else:
2520 url = urllib.parse.urlparse(url)
2521 if query_update:
2522 assert 'query' not in kwargs, 'query_update and query cannot be specified at the same time'
2523 kwargs['query'] = urllib.parse.urlencode({
2524 **urllib.parse.parse_qs(url.query),
2525 **query_update
2526 }, True)
2527 return urllib.parse.urlunparse(url._replace(**kwargs))
2528
2529
2530 def update_url_query(url, query):
2531 return update_url(url, query_update=query)
2532
2533
2534 def _multipart_encode_impl(data, boundary):
2535 content_type = 'multipart/form-data; boundary=%s' % boundary
2536
2537 out = b''
2538 for k, v in data.items():
2539 out += b'--' + boundary.encode('ascii') + b'\r\n'
2540 if isinstance(k, str):
2541 k = k.encode()
2542 if isinstance(v, str):
2543 v = v.encode()
2544 # RFC 2047 requires non-ASCII field names to be encoded, while RFC 7578
2545 # suggests sending UTF-8 directly. Firefox sends UTF-8, too
2546 content = b'Content-Disposition: form-data; name="' + k + b'"\r\n\r\n' + v + b'\r\n'
2547 if boundary.encode('ascii') in content:
2548 raise ValueError('Boundary overlaps with data')
2549 out += content
2550
2551 out += b'--' + boundary.encode('ascii') + b'--\r\n'
2552
2553 return out, content_type
2554
2555
2556 def multipart_encode(data, boundary=None):
2557 '''
2558 Encode a dict to RFC 7578-compliant form-data
2559
2560 data:
2561 A dict where keys and values can be either Unicode or bytes-like
2562 objects.
2563 boundary:
2564 If specified a Unicode object, it's used as the boundary. Otherwise
2565 a random boundary is generated.
2566
2567 Reference: https://tools.ietf.org/html/rfc7578
2568 '''
2569 has_specified_boundary = boundary is not None
2570
2571 while True:
2572 if boundary is None:
2573 boundary = '---------------' + str(random.randrange(0x0fffffff, 0xffffffff))
2574
2575 try:
2576 out, content_type = _multipart_encode_impl(data, boundary)
2577 break
2578 except ValueError:
2579 if has_specified_boundary:
2580 raise
2581 boundary = None
2582
2583 return out, content_type
2584
2585
2586 def is_iterable_like(x, allowed_types=collections.abc.Iterable, blocked_types=NO_DEFAULT):
2587 if blocked_types is NO_DEFAULT:
2588 blocked_types = (str, bytes, collections.abc.Mapping)
2589 return isinstance(x, allowed_types) and not isinstance(x, blocked_types)
2590
2591
2592 def variadic(x, allowed_types=NO_DEFAULT):
2593 if not isinstance(allowed_types, (tuple, type)):
2594 deprecation_warning('allowed_types should be a tuple or a type')
2595 allowed_types = tuple(allowed_types)
2596 return x if is_iterable_like(x, blocked_types=allowed_types) else (x, )
2597
2598
2599 def try_call(*funcs, expected_type=None, args=[], kwargs={}):
2600 for f in funcs:
2601 try:
2602 val = f(*args, **kwargs)
2603 except (AttributeError, KeyError, TypeError, IndexError, ValueError, ZeroDivisionError):
2604 pass
2605 else:
2606 if expected_type is None or isinstance(val, expected_type):
2607 return val
2608
2609
2610 def try_get(src, getter, expected_type=None):
2611 return try_call(*variadic(getter), args=(src,), expected_type=expected_type)
2612
2613
2614 def filter_dict(dct, cndn=lambda _, v: v is not None):
2615 return {k: v for k, v in dct.items() if cndn(k, v)}
2616
2617
2618 def merge_dicts(*dicts):
2619 merged = {}
2620 for a_dict in dicts:
2621 for k, v in a_dict.items():
2622 if (v is not None and k not in merged
2623 or isinstance(v, str) and merged[k] == ''):
2624 merged[k] = v
2625 return merged
2626
2627
2628 def encode_compat_str(string, encoding=preferredencoding(), errors='strict'):
2629 return string if isinstance(string, str) else str(string, encoding, errors)
2630
2631
2632 US_RATINGS = {
2633 'G': 0,
2634 'PG': 10,
2635 'PG-13': 13,
2636 'R': 16,
2637 'NC': 18,
2638 }
2639
2640
2641 TV_PARENTAL_GUIDELINES = {
2642 'TV-Y': 0,
2643 'TV-Y7': 7,
2644 'TV-G': 0,
2645 'TV-PG': 0,
2646 'TV-14': 14,
2647 'TV-MA': 17,
2648 }
2649
2650
2651 def parse_age_limit(s):
2652 # isinstance(False, int) is True. So type() must be used instead
2653 if type(s) is int: # noqa: E721
2654 return s if 0 <= s <= 21 else None
2655 elif not isinstance(s, str):
2656 return None
2657 m = re.match(r'^(?P<age>\d{1,2})\+?$', s)
2658 if m:
2659 return int(m.group('age'))
2660 s = s.upper()
2661 if s in US_RATINGS:
2662 return US_RATINGS[s]
2663 m = re.match(r'^TV[_-]?(%s)$' % '|'.join(k[3:] for k in TV_PARENTAL_GUIDELINES), s)
2664 if m:
2665 return TV_PARENTAL_GUIDELINES['TV-' + m.group(1)]
2666 return None
2667
2668
2669 def strip_jsonp(code):
2670 return re.sub(
2671 r'''(?sx)^
2672 (?:window\.)?(?P<func_name>[a-zA-Z0-9_.$]*)
2673 (?:\s*&&\s*(?P=func_name))?
2674 \s*\(\s*(?P<callback_data>.*)\);?
2675 \s*?(?://[^\n]*)*$''',
2676 r'\g<callback_data>', code)
2677
2678
2679 def js_to_json(code, vars={}, *, strict=False):
2680 # vars is a dict of var, val pairs to substitute
2681 STRING_QUOTES = '\'"`'
2682 STRING_RE = '|'.join(rf'{q}(?:\\.|[^\\{q}])*{q}' for q in STRING_QUOTES)
2683 COMMENT_RE = r'/\*(?:(?!\*/).)*?\*/|//[^\n]*\n'
2684 SKIP_RE = fr'\s*(?:{COMMENT_RE})?\s*'
2685 INTEGER_TABLE = (
2686 (fr'(?s)^(0[xX][0-9a-fA-F]+){SKIP_RE}:?$', 16),
2687 (fr'(?s)^(0+[0-7]+){SKIP_RE}:?$', 8),
2688 )
2689
2690 def process_escape(match):
2691 JSON_PASSTHROUGH_ESCAPES = R'"\bfnrtu'
2692 escape = match.group(1) or match.group(2)
2693
2694 return (Rf'\{escape}' if escape in JSON_PASSTHROUGH_ESCAPES
2695 else R'\u00' if escape == 'x'
2696 else '' if escape == '\n'
2697 else escape)
2698
2699 def template_substitute(match):
2700 evaluated = js_to_json(match.group(1), vars, strict=strict)
2701 if evaluated[0] == '"':
2702 return json.loads(evaluated)
2703 return evaluated
2704
2705 def fix_kv(m):
2706 v = m.group(0)
2707 if v in ('true', 'false', 'null'):
2708 return v
2709 elif v in ('undefined', 'void 0'):
2710 return 'null'
2711 elif v.startswith('/*') or v.startswith('//') or v.startswith('!') or v == ',':
2712 return ''
2713
2714 if v[0] in STRING_QUOTES:
2715 v = re.sub(r'(?s)\${([^}]+)}', template_substitute, v[1:-1]) if v[0] == '`' else v[1:-1]
2716 escaped = re.sub(r'(?s)(")|\\(.)', process_escape, v)
2717 return f'"{escaped}"'
2718
2719 for regex, base in INTEGER_TABLE:
2720 im = re.match(regex, v)
2721 if im:
2722 i = int(im.group(1), base)
2723 return f'"{i}":' if v.endswith(':') else str(i)
2724
2725 if v in vars:
2726 try:
2727 if not strict:
2728 json.loads(vars[v])
2729 except json.JSONDecodeError:
2730 return json.dumps(vars[v])
2731 else:
2732 return vars[v]
2733
2734 if not strict:
2735 return f'"{v}"'
2736
2737 raise ValueError(f'Unknown value: {v}')
2738
2739 def create_map(mobj):
2740 return json.dumps(dict(json.loads(js_to_json(mobj.group(1) or '[]', vars=vars))))
2741
2742 code = re.sub(r'(?:new\s+)?Array\((.*?)\)', r'[\g<1>]', code)
2743 code = re.sub(r'new Map\((\[.*?\])?\)', create_map, code)
2744 if not strict:
2745 code = re.sub(rf'new Date\(({STRING_RE})\)', r'\g<1>', code)
2746 code = re.sub(r'new \w+\((.*?)\)', lambda m: json.dumps(m.group(0)), code)
2747 code = re.sub(r'parseInt\([^\d]+(\d+)[^\d]+\)', r'\1', code)
2748 code = re.sub(r'\(function\([^)]*\)\s*\{[^}]*\}\s*\)\s*\(\s*(["\'][^)]*["\'])\s*\)', r'\1', code)
2749
2750 return re.sub(rf'''(?sx)
2751 {STRING_RE}|
2752 {COMMENT_RE}|,(?={SKIP_RE}[\]}}])|
2753 void\s0|(?:(?<![0-9])[eE]|[a-df-zA-DF-Z_$])[.a-zA-Z_$0-9]*|
2754 \b(?:0[xX][0-9a-fA-F]+|0+[0-7]+)(?:{SKIP_RE}:)?|
2755 [0-9]+(?={SKIP_RE}:)|
2756 !+
2757 ''', fix_kv, code)
2758
2759
2760 def qualities(quality_ids):
2761 """ Get a numeric quality value out of a list of possible values """
2762 def q(qid):
2763 try:
2764 return quality_ids.index(qid)
2765 except ValueError:
2766 return -1
2767 return q
2768
2769
2770 POSTPROCESS_WHEN = ('pre_process', 'after_filter', 'video', 'before_dl', 'post_process', 'after_move', 'after_video', 'playlist')
2771
2772
2773 DEFAULT_OUTTMPL = {
2774 'default': '%(title)s [%(id)s].%(ext)s',
2775 'chapter': '%(title)s - %(section_number)03d %(section_title)s [%(id)s].%(ext)s',
2776 }
2777 OUTTMPL_TYPES = {
2778 'chapter': None,
2779 'subtitle': None,
2780 'thumbnail': None,
2781 'description': 'description',
2782 'annotation': 'annotations.xml',
2783 'infojson': 'info.json',
2784 'link': None,
2785 'pl_video': None,
2786 'pl_thumbnail': None,
2787 'pl_description': 'description',
2788 'pl_infojson': 'info.json',
2789 }
2790
2791 # As of [1] format syntax is:
2792 # %[mapping_key][conversion_flags][minimum_width][.precision][length_modifier]type
2793 # 1. https://docs.python.org/2/library/stdtypes.html#string-formatting
2794 STR_FORMAT_RE_TMPL = r'''(?x)
2795 (?<!%)(?P<prefix>(?:%%)*)
2796 %
2797 (?P<has_key>\((?P<key>{0})\))?
2798 (?P<format>
2799 (?P<conversion>[#0\-+ ]+)?
2800 (?P<min_width>\d+)?
2801 (?P<precision>\.\d+)?
2802 (?P<len_mod>[hlL])? # unused in python
2803 {1} # conversion type
2804 )
2805 '''
2806
2807
2808 STR_FORMAT_TYPES = 'diouxXeEfFgGcrsa'
2809
2810
2811 def limit_length(s, length):
2812 """ Add ellipses to overly long strings """
2813 if s is None:
2814 return None
2815 ELLIPSES = '...'
2816 if len(s) > length:
2817 return s[:length - len(ELLIPSES)] + ELLIPSES
2818 return s
2819
2820
2821 def version_tuple(v):
2822 return tuple(int(e) for e in re.split(r'[-.]', v))
2823
2824
2825 def is_outdated_version(version, limit, assume_new=True):
2826 if not version:
2827 return not assume_new
2828 try:
2829 return version_tuple(version) < version_tuple(limit)
2830 except ValueError:
2831 return not assume_new
2832
2833
2834 def ytdl_is_updateable():
2835 """ Returns if yt-dlp can be updated with -U """
2836
2837 from ..update import is_non_updateable
2838
2839 return not is_non_updateable()
2840
2841
2842 def args_to_str(args):
2843 # Get a short string representation for a subprocess command
2844 return ' '.join(compat_shlex_quote(a) for a in args)
2845
2846
2847 def error_to_str(err):
2848 return f'{type(err).__name__}: {err}'
2849
2850
2851 def mimetype2ext(mt, default=NO_DEFAULT):
2852 if not isinstance(mt, str):
2853 if default is not NO_DEFAULT:
2854 return default
2855 return None
2856
2857 MAP = {
2858 # video
2859 '3gpp': '3gp',
2860 'mp2t': 'ts',
2861 'mp4': 'mp4',
2862 'mpeg': 'mpeg',
2863 'mpegurl': 'm3u8',
2864 'quicktime': 'mov',
2865 'webm': 'webm',
2866 'vp9': 'vp9',
2867 'video/ogg': 'ogv',
2868 'x-flv': 'flv',
2869 'x-m4v': 'm4v',
2870 'x-matroska': 'mkv',
2871 'x-mng': 'mng',
2872 'x-mp4-fragmented': 'mp4',
2873 'x-ms-asf': 'asf',
2874 'x-ms-wmv': 'wmv',
2875 'x-msvideo': 'avi',
2876
2877 # application (streaming playlists)
2878 'dash+xml': 'mpd',
2879 'f4m+xml': 'f4m',
2880 'hds+xml': 'f4m',
2881 'vnd.apple.mpegurl': 'm3u8',
2882 'vnd.ms-sstr+xml': 'ism',
2883 'x-mpegurl': 'm3u8',
2884
2885 # audio
2886 'audio/mp4': 'm4a',
2887 # Per RFC 3003, audio/mpeg can be .mp1, .mp2 or .mp3.
2888 # Using .mp3 as it's the most popular one
2889 'audio/mpeg': 'mp3',
2890 'audio/webm': 'webm',
2891 'audio/x-matroska': 'mka',
2892 'audio/x-mpegurl': 'm3u',
2893 'midi': 'mid',
2894 'ogg': 'ogg',
2895 'wav': 'wav',
2896 'wave': 'wav',
2897 'x-aac': 'aac',
2898 'x-flac': 'flac',
2899 'x-m4a': 'm4a',
2900 'x-realaudio': 'ra',
2901 'x-wav': 'wav',
2902
2903 # image
2904 'avif': 'avif',
2905 'bmp': 'bmp',
2906 'gif': 'gif',
2907 'jpeg': 'jpg',
2908 'png': 'png',
2909 'svg+xml': 'svg',
2910 'tiff': 'tif',
2911 'vnd.wap.wbmp': 'wbmp',
2912 'webp': 'webp',
2913 'x-icon': 'ico',
2914 'x-jng': 'jng',
2915 'x-ms-bmp': 'bmp',
2916
2917 # caption
2918 'filmstrip+json': 'fs',
2919 'smptett+xml': 'tt',
2920 'ttaf+xml': 'dfxp',
2921 'ttml+xml': 'ttml',
2922 'x-ms-sami': 'sami',
2923
2924 # misc
2925 'gzip': 'gz',
2926 'json': 'json',
2927 'xml': 'xml',
2928 'zip': 'zip',
2929 }
2930
2931 mimetype = mt.partition(';')[0].strip().lower()
2932 _, _, subtype = mimetype.rpartition('/')
2933
2934 ext = traversal.traverse_obj(MAP, mimetype, subtype, subtype.rsplit('+')[-1])
2935 if ext:
2936 return ext
2937 elif default is not NO_DEFAULT:
2938 return default
2939 return subtype.replace('+', '.')
2940
2941
2942 def ext2mimetype(ext_or_url):
2943 if not ext_or_url:
2944 return None
2945 if '.' not in ext_or_url:
2946 ext_or_url = f'file.{ext_or_url}'
2947 return mimetypes.guess_type(ext_or_url)[0]
2948
2949
2950 def parse_codecs(codecs_str):
2951 # http://tools.ietf.org/html/rfc6381
2952 if not codecs_str:
2953 return {}
2954 split_codecs = list(filter(None, map(
2955 str.strip, codecs_str.strip().strip(',').split(','))))
2956 vcodec, acodec, scodec, hdr = None, None, None, None
2957 for full_codec in split_codecs:
2958 parts = re.sub(r'0+(?=\d)', '', full_codec).split('.')
2959 if parts[0] in ('avc1', 'avc2', 'avc3', 'avc4', 'vp9', 'vp8', 'hev1', 'hev2',
2960 'h263', 'h264', 'mp4v', 'hvc1', 'av1', 'theora', 'dvh1', 'dvhe'):
2961 if vcodec:
2962 continue
2963 vcodec = full_codec
2964 if parts[0] in ('dvh1', 'dvhe'):
2965 hdr = 'DV'
2966 elif parts[0] == 'av1' and traversal.traverse_obj(parts, 3) == '10':
2967 hdr = 'HDR10'
2968 elif parts[:2] == ['vp9', '2']:
2969 hdr = 'HDR10'
2970 elif parts[0] in ('flac', 'mp4a', 'opus', 'vorbis', 'mp3', 'aac', 'ac-4',
2971 'ac-3', 'ec-3', 'eac3', 'dtsc', 'dtse', 'dtsh', 'dtsl'):
2972 acodec = acodec or full_codec
2973 elif parts[0] in ('stpp', 'wvtt'):
2974 scodec = scodec or full_codec
2975 else:
2976 write_string(f'WARNING: Unknown codec {full_codec}\n')
2977 if vcodec or acodec or scodec:
2978 return {
2979 'vcodec': vcodec or 'none',
2980 'acodec': acodec or 'none',
2981 'dynamic_range': hdr,
2982 **({'scodec': scodec} if scodec is not None else {}),
2983 }
2984 elif len(split_codecs) == 2:
2985 return {
2986 'vcodec': split_codecs[0],
2987 'acodec': split_codecs[1],
2988 }
2989 return {}
2990
2991
2992 def get_compatible_ext(*, vcodecs, acodecs, vexts, aexts, preferences=None):
2993 assert len(vcodecs) == len(vexts) and len(acodecs) == len(aexts)
2994
2995 allow_mkv = not preferences or 'mkv' in preferences
2996
2997 if allow_mkv and max(len(acodecs), len(vcodecs)) > 1:
2998 return 'mkv' # TODO: any other format allows this?
2999
3000 # TODO: All codecs supported by parse_codecs isn't handled here
3001 COMPATIBLE_CODECS = {
3002 'mp4': {
3003 'av1', 'hevc', 'avc1', 'mp4a', 'ac-4', # fourcc (m3u8, mpd)
3004 'h264', 'aacl', 'ec-3', # Set in ISM
3005 },
3006 'webm': {
3007 'av1', 'vp9', 'vp8', 'opus', 'vrbs',
3008 'vp9x', 'vp8x', # in the webm spec
3009 },
3010 }
3011
3012 sanitize_codec = functools.partial(
3013 try_get, getter=lambda x: x[0].split('.')[0].replace('0', '').lower())
3014 vcodec, acodec = sanitize_codec(vcodecs), sanitize_codec(acodecs)
3015
3016 for ext in preferences or COMPATIBLE_CODECS.keys():
3017 codec_set = COMPATIBLE_CODECS.get(ext, set())
3018 if ext == 'mkv' or codec_set.issuperset((vcodec, acodec)):
3019 return ext
3020
3021 COMPATIBLE_EXTS = (
3022 {'mp3', 'mp4', 'm4a', 'm4p', 'm4b', 'm4r', 'm4v', 'ismv', 'isma', 'mov'},
3023 {'webm', 'weba'},
3024 )
3025 for ext in preferences or vexts:
3026 current_exts = {ext, *vexts, *aexts}
3027 if ext == 'mkv' or current_exts == {ext} or any(
3028 ext_sets.issuperset(current_exts) for ext_sets in COMPATIBLE_EXTS):
3029 return ext
3030 return 'mkv' if allow_mkv else preferences[-1]
3031
3032
3033 def urlhandle_detect_ext(url_handle, default=NO_DEFAULT):
3034 getheader = url_handle.headers.get
3035
3036 cd = getheader('Content-Disposition')
3037 if cd:
3038 m = re.match(r'attachment;\s*filename="(?P<filename>[^"]+)"', cd)
3039 if m:
3040 e = determine_ext(m.group('filename'), default_ext=None)
3041 if e:
3042 return e
3043
3044 meta_ext = getheader('x-amz-meta-name')
3045 if meta_ext:
3046 e = meta_ext.rpartition('.')[2]
3047 if e:
3048 return e
3049
3050 return mimetype2ext(getheader('Content-Type'), default=default)
3051
3052
3053 def encode_data_uri(data, mime_type):
3054 return 'data:%s;base64,%s' % (mime_type, base64.b64encode(data).decode('ascii'))
3055
3056
3057 def age_restricted(content_limit, age_limit):
3058 """ Returns True iff the content should be blocked """
3059
3060 if age_limit is None: # No limit set
3061 return False
3062 if content_limit is None:
3063 return False # Content available for everyone
3064 return age_limit < content_limit
3065
3066
3067 # List of known byte-order-marks (BOM)
3068 BOMS = [
3069 (b'\xef\xbb\xbf', 'utf-8'),
3070 (b'\x00\x00\xfe\xff', 'utf-32-be'),
3071 (b'\xff\xfe\x00\x00', 'utf-32-le'),
3072 (b'\xff\xfe', 'utf-16-le'),
3073 (b'\xfe\xff', 'utf-16-be'),
3074 ]
3075
3076
3077 def is_html(first_bytes):
3078 """ Detect whether a file contains HTML by examining its first bytes. """
3079
3080 encoding = 'utf-8'
3081 for bom, enc in BOMS:
3082 while first_bytes.startswith(bom):
3083 encoding, first_bytes = enc, first_bytes[len(bom):]
3084
3085 return re.match(r'^\s*<', first_bytes.decode(encoding, 'replace'))
3086
3087
3088 def determine_protocol(info_dict):
3089 protocol = info_dict.get('protocol')
3090 if protocol is not None:
3091 return protocol
3092
3093 url = sanitize_url(info_dict['url'])
3094 if url.startswith('rtmp'):
3095 return 'rtmp'
3096 elif url.startswith('mms'):
3097 return 'mms'
3098 elif url.startswith('rtsp'):
3099 return 'rtsp'
3100
3101 ext = determine_ext(url)
3102 if ext == 'm3u8':
3103 return 'm3u8' if info_dict.get('is_live') else 'm3u8_native'
3104 elif ext == 'f4m':
3105 return 'f4m'
3106
3107 return urllib.parse.urlparse(url).scheme
3108
3109
3110 def render_table(header_row, data, delim=False, extra_gap=0, hide_empty=False):
3111 """ Render a list of rows, each as a list of values.
3112 Text after a \t will be right aligned """
3113 def width(string):
3114 return len(remove_terminal_sequences(string).replace('\t', ''))
3115
3116 def get_max_lens(table):
3117 return [max(width(str(v)) for v in col) for col in zip(*table)]
3118
3119 def filter_using_list(row, filterArray):
3120 return [col for take, col in itertools.zip_longest(filterArray, row, fillvalue=True) if take]
3121
3122 max_lens = get_max_lens(data) if hide_empty else []
3123 header_row = filter_using_list(header_row, max_lens)
3124 data = [filter_using_list(row, max_lens) for row in data]
3125
3126 table = [header_row] + data
3127 max_lens = get_max_lens(table)
3128 extra_gap += 1
3129 if delim:
3130 table = [header_row, [delim * (ml + extra_gap) for ml in max_lens]] + data
3131 table[1][-1] = table[1][-1][:-extra_gap * len(delim)] # Remove extra_gap from end of delimiter
3132 for row in table:
3133 for pos, text in enumerate(map(str, row)):
3134 if '\t' in text:
3135 row[pos] = text.replace('\t', ' ' * (max_lens[pos] - width(text))) + ' ' * extra_gap
3136 else:
3137 row[pos] = text + ' ' * (max_lens[pos] - width(text) + extra_gap)
3138 ret = '\n'.join(''.join(row).rstrip() for row in table)
3139 return ret
3140
3141
3142 def _match_one(filter_part, dct, incomplete):
3143 # TODO: Generalize code with YoutubeDL._build_format_filter
3144 STRING_OPERATORS = {
3145 '*=': operator.contains,
3146 '^=': lambda attr, value: attr.startswith(value),
3147 '$=': lambda attr, value: attr.endswith(value),
3148 '~=': lambda attr, value: re.search(value, attr),
3149 }
3150 COMPARISON_OPERATORS = {
3151 **STRING_OPERATORS,
3152 '<=': operator.le, # "<=" must be defined above "<"
3153 '<': operator.lt,
3154 '>=': operator.ge,
3155 '>': operator.gt,
3156 '=': operator.eq,
3157 }
3158
3159 if isinstance(incomplete, bool):
3160 is_incomplete = lambda _: incomplete
3161 else:
3162 is_incomplete = lambda k: k in incomplete
3163
3164 operator_rex = re.compile(r'''(?x)
3165 (?P<key>[a-z_]+)
3166 \s*(?P<negation>!\s*)?(?P<op>%s)(?P<none_inclusive>\s*\?)?\s*
3167 (?:
3168 (?P<quote>["\'])(?P<quotedstrval>.+?)(?P=quote)|
3169 (?P<strval>.+?)
3170 )
3171 ''' % '|'.join(map(re.escape, COMPARISON_OPERATORS.keys())))
3172 m = operator_rex.fullmatch(filter_part.strip())
3173 if m:
3174 m = m.groupdict()
3175 unnegated_op = COMPARISON_OPERATORS[m['op']]
3176 if m['negation']:
3177 op = lambda attr, value: not unnegated_op(attr, value)
3178 else:
3179 op = unnegated_op
3180 comparison_value = m['quotedstrval'] or m['strval'] or m['intval']
3181 if m['quote']:
3182 comparison_value = comparison_value.replace(r'\%s' % m['quote'], m['quote'])
3183 actual_value = dct.get(m['key'])
3184 numeric_comparison = None
3185 if isinstance(actual_value, (int, float)):
3186 # If the original field is a string and matching comparisonvalue is
3187 # a number we should respect the origin of the original field
3188 # and process comparison value as a string (see
3189 # https://github.com/ytdl-org/youtube-dl/issues/11082)
3190 try:
3191 numeric_comparison = int(comparison_value)
3192 except ValueError:
3193 numeric_comparison = parse_filesize(comparison_value)
3194 if numeric_comparison is None:
3195 numeric_comparison = parse_filesize(f'{comparison_value}B')
3196 if numeric_comparison is None:
3197 numeric_comparison = parse_duration(comparison_value)
3198 if numeric_comparison is not None and m['op'] in STRING_OPERATORS:
3199 raise ValueError('Operator %s only supports string values!' % m['op'])
3200 if actual_value is None:
3201 return is_incomplete(m['key']) or m['none_inclusive']
3202 return op(actual_value, comparison_value if numeric_comparison is None else numeric_comparison)
3203
3204 UNARY_OPERATORS = {
3205 '': lambda v: (v is True) if isinstance(v, bool) else (v is not None),
3206 '!': lambda v: (v is False) if isinstance(v, bool) else (v is None),
3207 }
3208 operator_rex = re.compile(r'''(?x)
3209 (?P<op>%s)\s*(?P<key>[a-z_]+)
3210 ''' % '|'.join(map(re.escape, UNARY_OPERATORS.keys())))
3211 m = operator_rex.fullmatch(filter_part.strip())
3212 if m:
3213 op = UNARY_OPERATORS[m.group('op')]
3214 actual_value = dct.get(m.group('key'))
3215 if is_incomplete(m.group('key')) and actual_value is None:
3216 return True
3217 return op(actual_value)
3218
3219 raise ValueError('Invalid filter part %r' % filter_part)
3220
3221
3222 def match_str(filter_str, dct, incomplete=False):
3223 """ Filter a dictionary with a simple string syntax.
3224 @returns Whether the filter passes
3225 @param incomplete Set of keys that is expected to be missing from dct.
3226 Can be True/False to indicate all/none of the keys may be missing.
3227 All conditions on incomplete keys pass if the key is missing
3228 """
3229 return all(
3230 _match_one(filter_part.replace(r'\&', '&'), dct, incomplete)
3231 for filter_part in re.split(r'(?<!\\)&', filter_str))
3232
3233
3234 def match_filter_func(filters, breaking_filters=None):
3235 if not filters and not breaking_filters:
3236 return None
3237 breaking_filters = match_filter_func(breaking_filters) or (lambda _, __: None)
3238 filters = set(variadic(filters or []))
3239
3240 interactive = '-' in filters
3241 if interactive:
3242 filters.remove('-')
3243
3244 def _match_func(info_dict, incomplete=False):
3245 ret = breaking_filters(info_dict, incomplete)
3246 if ret is not None:
3247 raise RejectedVideoReached(ret)
3248
3249 if not filters or any(match_str(f, info_dict, incomplete) for f in filters):
3250 return NO_DEFAULT if interactive and not incomplete else None
3251 else:
3252 video_title = info_dict.get('title') or info_dict.get('id') or 'entry'
3253 filter_str = ') | ('.join(map(str.strip, filters))
3254 return f'{video_title} does not pass filter ({filter_str}), skipping ..'
3255 return _match_func
3256
3257
3258 class download_range_func:
3259 def __init__(self, chapters, ranges, from_info=False):
3260 self.chapters, self.ranges, self.from_info = chapters, ranges, from_info
3261
3262 def __call__(self, info_dict, ydl):
3263
3264 warning = ('There are no chapters matching the regex' if info_dict.get('chapters')
3265 else 'Cannot match chapters since chapter information is unavailable')
3266 for regex in self.chapters or []:
3267 for i, chapter in enumerate(info_dict.get('chapters') or []):
3268 if re.search(regex, chapter['title']):
3269 warning = None
3270 yield {**chapter, 'index': i}
3271 if self.chapters and warning:
3272 ydl.to_screen(f'[info] {info_dict["id"]}: {warning}')
3273
3274 for start, end in self.ranges or []:
3275 yield {
3276 'start_time': self._handle_negative_timestamp(start, info_dict),
3277 'end_time': self._handle_negative_timestamp(end, info_dict),
3278 }
3279
3280 if self.from_info and (info_dict.get('start_time') or info_dict.get('end_time')):
3281 yield {
3282 'start_time': info_dict.get('start_time') or 0,
3283 'end_time': info_dict.get('end_time') or float('inf'),
3284 }
3285 elif not self.ranges and not self.chapters:
3286 yield {}
3287
3288 @staticmethod
3289 def _handle_negative_timestamp(time, info):
3290 return max(info['duration'] + time, 0) if info.get('duration') and time < 0 else time
3291
3292 def __eq__(self, other):
3293 return (isinstance(other, download_range_func)
3294 and self.chapters == other.chapters and self.ranges == other.ranges)
3295
3296 def __repr__(self):
3297 return f'{__name__}.{type(self).__name__}({self.chapters}, {self.ranges})'
3298
3299
3300 def parse_dfxp_time_expr(time_expr):
3301 if not time_expr:
3302 return
3303
3304 mobj = re.match(rf'^(?P<time_offset>{NUMBER_RE})s?$', time_expr)
3305 if mobj:
3306 return float(mobj.group('time_offset'))
3307
3308 mobj = re.match(r'^(\d+):(\d\d):(\d\d(?:(?:\.|:)\d+)?)$', time_expr)
3309 if mobj:
3310 return 3600 * int(mobj.group(1)) + 60 * int(mobj.group(2)) + float(mobj.group(3).replace(':', '.'))
3311
3312
3313 def srt_subtitles_timecode(seconds):
3314 return '%02d:%02d:%02d,%03d' % timetuple_from_msec(seconds * 1000)
3315
3316
3317 def ass_subtitles_timecode(seconds):
3318 time = timetuple_from_msec(seconds * 1000)
3319 return '%01d:%02d:%02d.%02d' % (*time[:-1], time.milliseconds / 10)
3320
3321
3322 def dfxp2srt(dfxp_data):
3323 '''
3324 @param dfxp_data A bytes-like object containing DFXP data
3325 @returns A unicode object containing converted SRT data
3326 '''
3327 LEGACY_NAMESPACES = (
3328 (b'http://www.w3.org/ns/ttml', [
3329 b'http://www.w3.org/2004/11/ttaf1',
3330 b'http://www.w3.org/2006/04/ttaf1',
3331 b'http://www.w3.org/2006/10/ttaf1',
3332 ]),
3333 (b'http://www.w3.org/ns/ttml#styling', [
3334 b'http://www.w3.org/ns/ttml#style',
3335 ]),
3336 )
3337
3338 SUPPORTED_STYLING = [
3339 'color',
3340 'fontFamily',
3341 'fontSize',
3342 'fontStyle',
3343 'fontWeight',
3344 'textDecoration'
3345 ]
3346
3347 _x = functools.partial(xpath_with_ns, ns_map={
3348 'xml': 'http://www.w3.org/XML/1998/namespace',
3349 'ttml': 'http://www.w3.org/ns/ttml',
3350 'tts': 'http://www.w3.org/ns/ttml#styling',
3351 })
3352
3353 styles = {}
3354 default_style = {}
3355
3356 class TTMLPElementParser:
3357 _out = ''
3358 _unclosed_elements = []
3359 _applied_styles = []
3360
3361 def start(self, tag, attrib):
3362 if tag in (_x('ttml:br'), 'br'):
3363 self._out += '\n'
3364 else:
3365 unclosed_elements = []
3366 style = {}
3367 element_style_id = attrib.get('style')
3368 if default_style:
3369 style.update(default_style)
3370 if element_style_id:
3371 style.update(styles.get(element_style_id, {}))
3372 for prop in SUPPORTED_STYLING:
3373 prop_val = attrib.get(_x('tts:' + prop))
3374 if prop_val:
3375 style[prop] = prop_val
3376 if style:
3377 font = ''
3378 for k, v in sorted(style.items()):
3379 if self._applied_styles and self._applied_styles[-1].get(k) == v:
3380 continue
3381 if k == 'color':
3382 font += ' color="%s"' % v
3383 elif k == 'fontSize':
3384 font += ' size="%s"' % v
3385 elif k == 'fontFamily':
3386 font += ' face="%s"' % v
3387 elif k == 'fontWeight' and v == 'bold':
3388 self._out += '<b>'
3389 unclosed_elements.append('b')
3390 elif k == 'fontStyle' and v == 'italic':
3391 self._out += '<i>'
3392 unclosed_elements.append('i')
3393 elif k == 'textDecoration' and v == 'underline':
3394 self._out += '<u>'
3395 unclosed_elements.append('u')
3396 if font:
3397 self._out += '<font' + font + '>'
3398 unclosed_elements.append('font')
3399 applied_style = {}
3400 if self._applied_styles:
3401 applied_style.update(self._applied_styles[-1])
3402 applied_style.update(style)
3403 self._applied_styles.append(applied_style)
3404 self._unclosed_elements.append(unclosed_elements)
3405
3406 def end(self, tag):
3407 if tag not in (_x('ttml:br'), 'br'):
3408 unclosed_elements = self._unclosed_elements.pop()
3409 for element in reversed(unclosed_elements):
3410 self._out += '</%s>' % element
3411 if unclosed_elements and self._applied_styles:
3412 self._applied_styles.pop()
3413
3414 def data(self, data):
3415 self._out += data
3416
3417 def close(self):
3418 return self._out.strip()
3419
3420 # Fix UTF-8 encoded file wrongly marked as UTF-16. See https://github.com/yt-dlp/yt-dlp/issues/6543#issuecomment-1477169870
3421 # This will not trigger false positives since only UTF-8 text is being replaced
3422 dfxp_data = dfxp_data.replace(b'encoding=\'UTF-16\'', b'encoding=\'UTF-8\'')
3423
3424 def parse_node(node):
3425 target = TTMLPElementParser()
3426 parser = xml.etree.ElementTree.XMLParser(target=target)
3427 parser.feed(xml.etree.ElementTree.tostring(node))
3428 return parser.close()
3429
3430 for k, v in LEGACY_NAMESPACES:
3431 for ns in v:
3432 dfxp_data = dfxp_data.replace(ns, k)
3433
3434 dfxp = compat_etree_fromstring(dfxp_data)
3435 out = []
3436 paras = dfxp.findall(_x('.//ttml:p')) or dfxp.findall('.//p')
3437
3438 if not paras:
3439 raise ValueError('Invalid dfxp/TTML subtitle')
3440
3441 repeat = False
3442 while True:
3443 for style in dfxp.findall(_x('.//ttml:style')):
3444 style_id = style.get('id') or style.get(_x('xml:id'))
3445 if not style_id:
3446 continue
3447 parent_style_id = style.get('style')
3448 if parent_style_id:
3449 if parent_style_id not in styles:
3450 repeat = True
3451 continue
3452 styles[style_id] = styles[parent_style_id].copy()
3453 for prop in SUPPORTED_STYLING:
3454 prop_val = style.get(_x('tts:' + prop))
3455 if prop_val:
3456 styles.setdefault(style_id, {})[prop] = prop_val
3457 if repeat:
3458 repeat = False
3459 else:
3460 break
3461
3462 for p in ('body', 'div'):
3463 ele = xpath_element(dfxp, [_x('.//ttml:' + p), './/' + p])
3464 if ele is None:
3465 continue
3466 style = styles.get(ele.get('style'))
3467 if not style:
3468 continue
3469 default_style.update(style)
3470
3471 for para, index in zip(paras, itertools.count(1)):
3472 begin_time = parse_dfxp_time_expr(para.attrib.get('begin'))
3473 end_time = parse_dfxp_time_expr(para.attrib.get('end'))
3474 dur = parse_dfxp_time_expr(para.attrib.get('dur'))
3475 if begin_time is None:
3476 continue
3477 if not end_time:
3478 if not dur:
3479 continue
3480 end_time = begin_time + dur
3481 out.append('%d\n%s --> %s\n%s\n\n' % (
3482 index,
3483 srt_subtitles_timecode(begin_time),
3484 srt_subtitles_timecode(end_time),
3485 parse_node(para)))
3486
3487 return ''.join(out)
3488
3489
3490 def cli_option(params, command_option, param, separator=None):
3491 param = params.get(param)
3492 return ([] if param is None
3493 else [command_option, str(param)] if separator is None
3494 else [f'{command_option}{separator}{param}'])
3495
3496
3497 def cli_bool_option(params, command_option, param, true_value='true', false_value='false', separator=None):
3498 param = params.get(param)
3499 assert param in (True, False, None)
3500 return cli_option({True: true_value, False: false_value}, command_option, param, separator)
3501
3502
3503 def cli_valueless_option(params, command_option, param, expected_value=True):
3504 return [command_option] if params.get(param) == expected_value else []
3505
3506
3507 def cli_configuration_args(argdict, keys, default=[], use_compat=True):
3508 if isinstance(argdict, (list, tuple)): # for backward compatibility
3509 if use_compat:
3510 return argdict
3511 else:
3512 argdict = None
3513 if argdict is None:
3514 return default
3515 assert isinstance(argdict, dict)
3516
3517 assert isinstance(keys, (list, tuple))
3518 for key_list in keys:
3519 arg_list = list(filter(
3520 lambda x: x is not None,
3521 [argdict.get(key.lower()) for key in variadic(key_list)]))
3522 if arg_list:
3523 return [arg for args in arg_list for arg in args]
3524 return default
3525
3526
3527 def _configuration_args(main_key, argdict, exe, keys=None, default=[], use_compat=True):
3528 main_key, exe = main_key.lower(), exe.lower()
3529 root_key = exe if main_key == exe else f'{main_key}+{exe}'
3530 keys = [f'{root_key}{k}' for k in (keys or [''])]
3531 if root_key in keys:
3532 if main_key != exe:
3533 keys.append((main_key, exe))
3534 keys.append('default')
3535 else:
3536 use_compat = False
3537 return cli_configuration_args(argdict, keys, default, use_compat)
3538
3539
3540 class ISO639Utils:
3541 # See http://www.loc.gov/standards/iso639-2/ISO-639-2_utf-8.txt
3542 _lang_map = {
3543 'aa': 'aar',
3544 'ab': 'abk',
3545 'ae': 'ave',
3546 'af': 'afr',
3547 'ak': 'aka',
3548 'am': 'amh',
3549 'an': 'arg',
3550 'ar': 'ara',
3551 'as': 'asm',
3552 'av': 'ava',
3553 'ay': 'aym',
3554 'az': 'aze',
3555 'ba': 'bak',
3556 'be': 'bel',
3557 'bg': 'bul',
3558 'bh': 'bih',
3559 'bi': 'bis',
3560 'bm': 'bam',
3561 'bn': 'ben',
3562 'bo': 'bod',
3563 'br': 'bre',
3564 'bs': 'bos',
3565 'ca': 'cat',
3566 'ce': 'che',
3567 'ch': 'cha',
3568 'co': 'cos',
3569 'cr': 'cre',
3570 'cs': 'ces',
3571 'cu': 'chu',
3572 'cv': 'chv',
3573 'cy': 'cym',
3574 'da': 'dan',
3575 'de': 'deu',
3576 'dv': 'div',
3577 'dz': 'dzo',
3578 'ee': 'ewe',
3579 'el': 'ell',
3580 'en': 'eng',
3581 'eo': 'epo',
3582 'es': 'spa',
3583 'et': 'est',
3584 'eu': 'eus',
3585 'fa': 'fas',
3586 'ff': 'ful',
3587 'fi': 'fin',
3588 'fj': 'fij',
3589 'fo': 'fao',
3590 'fr': 'fra',
3591 'fy': 'fry',
3592 'ga': 'gle',
3593 'gd': 'gla',
3594 'gl': 'glg',
3595 'gn': 'grn',
3596 'gu': 'guj',
3597 'gv': 'glv',
3598 'ha': 'hau',
3599 'he': 'heb',
3600 'iw': 'heb', # Replaced by he in 1989 revision
3601 'hi': 'hin',
3602 'ho': 'hmo',
3603 'hr': 'hrv',
3604 'ht': 'hat',
3605 'hu': 'hun',
3606 'hy': 'hye',
3607 'hz': 'her',
3608 'ia': 'ina',
3609 'id': 'ind',
3610 'in': 'ind', # Replaced by id in 1989 revision
3611 'ie': 'ile',
3612 'ig': 'ibo',
3613 'ii': 'iii',
3614 'ik': 'ipk',
3615 'io': 'ido',
3616 'is': 'isl',
3617 'it': 'ita',
3618 'iu': 'iku',
3619 'ja': 'jpn',
3620 'jv': 'jav',
3621 'ka': 'kat',
3622 'kg': 'kon',
3623 'ki': 'kik',
3624 'kj': 'kua',
3625 'kk': 'kaz',
3626 'kl': 'kal',
3627 'km': 'khm',
3628 'kn': 'kan',
3629 'ko': 'kor',
3630 'kr': 'kau',
3631 'ks': 'kas',
3632 'ku': 'kur',
3633 'kv': 'kom',
3634 'kw': 'cor',
3635 'ky': 'kir',
3636 'la': 'lat',
3637 'lb': 'ltz',
3638 'lg': 'lug',
3639 'li': 'lim',
3640 'ln': 'lin',
3641 'lo': 'lao',
3642 'lt': 'lit',
3643 'lu': 'lub',
3644 'lv': 'lav',
3645 'mg': 'mlg',
3646 'mh': 'mah',
3647 'mi': 'mri',
3648 'mk': 'mkd',
3649 'ml': 'mal',
3650 'mn': 'mon',
3651 'mr': 'mar',
3652 'ms': 'msa',
3653 'mt': 'mlt',
3654 'my': 'mya',
3655 'na': 'nau',
3656 'nb': 'nob',
3657 'nd': 'nde',
3658 'ne': 'nep',
3659 'ng': 'ndo',
3660 'nl': 'nld',
3661 'nn': 'nno',
3662 'no': 'nor',
3663 'nr': 'nbl',
3664 'nv': 'nav',
3665 'ny': 'nya',
3666 'oc': 'oci',
3667 'oj': 'oji',
3668 'om': 'orm',
3669 'or': 'ori',
3670 'os': 'oss',
3671 'pa': 'pan',
3672 'pe': 'per',
3673 'pi': 'pli',
3674 'pl': 'pol',
3675 'ps': 'pus',
3676 'pt': 'por',
3677 'qu': 'que',
3678 'rm': 'roh',
3679 'rn': 'run',
3680 'ro': 'ron',
3681 'ru': 'rus',
3682 'rw': 'kin',
3683 'sa': 'san',
3684 'sc': 'srd',
3685 'sd': 'snd',
3686 'se': 'sme',
3687 'sg': 'sag',
3688 'si': 'sin',
3689 'sk': 'slk',
3690 'sl': 'slv',
3691 'sm': 'smo',
3692 'sn': 'sna',
3693 'so': 'som',
3694 'sq': 'sqi',
3695 'sr': 'srp',
3696 'ss': 'ssw',
3697 'st': 'sot',
3698 'su': 'sun',
3699 'sv': 'swe',
3700 'sw': 'swa',
3701 'ta': 'tam',
3702 'te': 'tel',
3703 'tg': 'tgk',
3704 'th': 'tha',
3705 'ti': 'tir',
3706 'tk': 'tuk',
3707 'tl': 'tgl',
3708 'tn': 'tsn',
3709 'to': 'ton',
3710 'tr': 'tur',
3711 'ts': 'tso',
3712 'tt': 'tat',
3713 'tw': 'twi',
3714 'ty': 'tah',
3715 'ug': 'uig',
3716 'uk': 'ukr',
3717 'ur': 'urd',
3718 'uz': 'uzb',
3719 've': 'ven',
3720 'vi': 'vie',
3721 'vo': 'vol',
3722 'wa': 'wln',
3723 'wo': 'wol',
3724 'xh': 'xho',
3725 'yi': 'yid',
3726 'ji': 'yid', # Replaced by yi in 1989 revision
3727 'yo': 'yor',
3728 'za': 'zha',
3729 'zh': 'zho',
3730 'zu': 'zul',
3731 }
3732
3733 @classmethod
3734 def short2long(cls, code):
3735 """Convert language code from ISO 639-1 to ISO 639-2/T"""
3736 return cls._lang_map.get(code[:2])
3737
3738 @classmethod
3739 def long2short(cls, code):
3740 """Convert language code from ISO 639-2/T to ISO 639-1"""
3741 for short_name, long_name in cls._lang_map.items():
3742 if long_name == code:
3743 return short_name
3744
3745
3746 class ISO3166Utils:
3747 # From http://data.okfn.org/data/core/country-list
3748 _country_map = {
3749 'AF': 'Afghanistan',
3750 'AX': 'Åland Islands',
3751 'AL': 'Albania',
3752 'DZ': 'Algeria',
3753 'AS': 'American Samoa',
3754 'AD': 'Andorra',
3755 'AO': 'Angola',
3756 'AI': 'Anguilla',
3757 'AQ': 'Antarctica',
3758 'AG': 'Antigua and Barbuda',
3759 'AR': 'Argentina',
3760 'AM': 'Armenia',
3761 'AW': 'Aruba',
3762 'AU': 'Australia',
3763 'AT': 'Austria',
3764 'AZ': 'Azerbaijan',
3765 'BS': 'Bahamas',
3766 'BH': 'Bahrain',
3767 'BD': 'Bangladesh',
3768 'BB': 'Barbados',
3769 'BY': 'Belarus',
3770 'BE': 'Belgium',
3771 'BZ': 'Belize',
3772 'BJ': 'Benin',
3773 'BM': 'Bermuda',
3774 'BT': 'Bhutan',
3775 'BO': 'Bolivia, Plurinational State of',
3776 'BQ': 'Bonaire, Sint Eustatius and Saba',
3777 'BA': 'Bosnia and Herzegovina',
3778 'BW': 'Botswana',
3779 'BV': 'Bouvet Island',
3780 'BR': 'Brazil',
3781 'IO': 'British Indian Ocean Territory',
3782 'BN': 'Brunei Darussalam',
3783 'BG': 'Bulgaria',
3784 'BF': 'Burkina Faso',
3785 'BI': 'Burundi',
3786 'KH': 'Cambodia',
3787 'CM': 'Cameroon',
3788 'CA': 'Canada',
3789 'CV': 'Cape Verde',
3790 'KY': 'Cayman Islands',
3791 'CF': 'Central African Republic',
3792 'TD': 'Chad',
3793 'CL': 'Chile',
3794 'CN': 'China',
3795 'CX': 'Christmas Island',
3796 'CC': 'Cocos (Keeling) Islands',
3797 'CO': 'Colombia',
3798 'KM': 'Comoros',
3799 'CG': 'Congo',
3800 'CD': 'Congo, the Democratic Republic of the',
3801 'CK': 'Cook Islands',
3802 'CR': 'Costa Rica',
3803 'CI': 'Côte d\'Ivoire',
3804 'HR': 'Croatia',
3805 'CU': 'Cuba',
3806 'CW': 'Curaçao',
3807 'CY': 'Cyprus',
3808 'CZ': 'Czech Republic',
3809 'DK': 'Denmark',
3810 'DJ': 'Djibouti',
3811 'DM': 'Dominica',
3812 'DO': 'Dominican Republic',
3813 'EC': 'Ecuador',
3814 'EG': 'Egypt',
3815 'SV': 'El Salvador',
3816 'GQ': 'Equatorial Guinea',
3817 'ER': 'Eritrea',
3818 'EE': 'Estonia',
3819 'ET': 'Ethiopia',
3820 'FK': 'Falkland Islands (Malvinas)',
3821 'FO': 'Faroe Islands',
3822 'FJ': 'Fiji',
3823 'FI': 'Finland',
3824 'FR': 'France',
3825 'GF': 'French Guiana',
3826 'PF': 'French Polynesia',
3827 'TF': 'French Southern Territories',
3828 'GA': 'Gabon',
3829 'GM': 'Gambia',
3830 'GE': 'Georgia',
3831 'DE': 'Germany',
3832 'GH': 'Ghana',
3833 'GI': 'Gibraltar',
3834 'GR': 'Greece',
3835 'GL': 'Greenland',
3836 'GD': 'Grenada',
3837 'GP': 'Guadeloupe',
3838 'GU': 'Guam',
3839 'GT': 'Guatemala',
3840 'GG': 'Guernsey',
3841 'GN': 'Guinea',
3842 'GW': 'Guinea-Bissau',
3843 'GY': 'Guyana',
3844 'HT': 'Haiti',
3845 'HM': 'Heard Island and McDonald Islands',
3846 'VA': 'Holy See (Vatican City State)',
3847 'HN': 'Honduras',
3848 'HK': 'Hong Kong',
3849 'HU': 'Hungary',
3850 'IS': 'Iceland',
3851 'IN': 'India',
3852 'ID': 'Indonesia',
3853 'IR': 'Iran, Islamic Republic of',
3854 'IQ': 'Iraq',
3855 'IE': 'Ireland',
3856 'IM': 'Isle of Man',
3857 'IL': 'Israel',
3858 'IT': 'Italy',
3859 'JM': 'Jamaica',
3860 'JP': 'Japan',
3861 'JE': 'Jersey',
3862 'JO': 'Jordan',
3863 'KZ': 'Kazakhstan',
3864 'KE': 'Kenya',
3865 'KI': 'Kiribati',
3866 'KP': 'Korea, Democratic People\'s Republic of',
3867 'KR': 'Korea, Republic of',
3868 'KW': 'Kuwait',
3869 'KG': 'Kyrgyzstan',
3870 'LA': 'Lao People\'s Democratic Republic',
3871 'LV': 'Latvia',
3872 'LB': 'Lebanon',
3873 'LS': 'Lesotho',
3874 'LR': 'Liberia',
3875 'LY': 'Libya',
3876 'LI': 'Liechtenstein',
3877 'LT': 'Lithuania',
3878 'LU': 'Luxembourg',
3879 'MO': 'Macao',
3880 'MK': 'Macedonia, the Former Yugoslav Republic of',
3881 'MG': 'Madagascar',
3882 'MW': 'Malawi',
3883 'MY': 'Malaysia',
3884 'MV': 'Maldives',
3885 'ML': 'Mali',
3886 'MT': 'Malta',
3887 'MH': 'Marshall Islands',
3888 'MQ': 'Martinique',
3889 'MR': 'Mauritania',
3890 'MU': 'Mauritius',
3891 'YT': 'Mayotte',
3892 'MX': 'Mexico',
3893 'FM': 'Micronesia, Federated States of',
3894 'MD': 'Moldova, Republic of',
3895 'MC': 'Monaco',
3896 'MN': 'Mongolia',
3897 'ME': 'Montenegro',
3898 'MS': 'Montserrat',
3899 'MA': 'Morocco',
3900 'MZ': 'Mozambique',
3901 'MM': 'Myanmar',
3902 'NA': 'Namibia',
3903 'NR': 'Nauru',
3904 'NP': 'Nepal',
3905 'NL': 'Netherlands',
3906 'NC': 'New Caledonia',
3907 'NZ': 'New Zealand',
3908 'NI': 'Nicaragua',
3909 'NE': 'Niger',
3910 'NG': 'Nigeria',
3911 'NU': 'Niue',
3912 'NF': 'Norfolk Island',
3913 'MP': 'Northern Mariana Islands',
3914 'NO': 'Norway',
3915 'OM': 'Oman',
3916 'PK': 'Pakistan',
3917 'PW': 'Palau',
3918 'PS': 'Palestine, State of',
3919 'PA': 'Panama',
3920 'PG': 'Papua New Guinea',
3921 'PY': 'Paraguay',
3922 'PE': 'Peru',
3923 'PH': 'Philippines',
3924 'PN': 'Pitcairn',
3925 'PL': 'Poland',
3926 'PT': 'Portugal',
3927 'PR': 'Puerto Rico',
3928 'QA': 'Qatar',
3929 'RE': 'Réunion',
3930 'RO': 'Romania',
3931 'RU': 'Russian Federation',
3932 'RW': 'Rwanda',
3933 'BL': 'Saint Barthélemy',
3934 'SH': 'Saint Helena, Ascension and Tristan da Cunha',
3935 'KN': 'Saint Kitts and Nevis',
3936 'LC': 'Saint Lucia',
3937 'MF': 'Saint Martin (French part)',
3938 'PM': 'Saint Pierre and Miquelon',
3939 'VC': 'Saint Vincent and the Grenadines',
3940 'WS': 'Samoa',
3941 'SM': 'San Marino',
3942 'ST': 'Sao Tome and Principe',
3943 'SA': 'Saudi Arabia',
3944 'SN': 'Senegal',
3945 'RS': 'Serbia',
3946 'SC': 'Seychelles',
3947 'SL': 'Sierra Leone',
3948 'SG': 'Singapore',
3949 'SX': 'Sint Maarten (Dutch part)',
3950 'SK': 'Slovakia',
3951 'SI': 'Slovenia',
3952 'SB': 'Solomon Islands',
3953 'SO': 'Somalia',
3954 'ZA': 'South Africa',
3955 'GS': 'South Georgia and the South Sandwich Islands',
3956 'SS': 'South Sudan',
3957 'ES': 'Spain',
3958 'LK': 'Sri Lanka',
3959 'SD': 'Sudan',
3960 'SR': 'Suriname',
3961 'SJ': 'Svalbard and Jan Mayen',
3962 'SZ': 'Swaziland',
3963 'SE': 'Sweden',
3964 'CH': 'Switzerland',
3965 'SY': 'Syrian Arab Republic',
3966 'TW': 'Taiwan, Province of China',
3967 'TJ': 'Tajikistan',
3968 'TZ': 'Tanzania, United Republic of',
3969 'TH': 'Thailand',
3970 'TL': 'Timor-Leste',
3971 'TG': 'Togo',
3972 'TK': 'Tokelau',
3973 'TO': 'Tonga',
3974 'TT': 'Trinidad and Tobago',
3975 'TN': 'Tunisia',
3976 'TR': 'Turkey',
3977 'TM': 'Turkmenistan',
3978 'TC': 'Turks and Caicos Islands',
3979 'TV': 'Tuvalu',
3980 'UG': 'Uganda',
3981 'UA': 'Ukraine',
3982 'AE': 'United Arab Emirates',
3983 'GB': 'United Kingdom',
3984 'US': 'United States',
3985 'UM': 'United States Minor Outlying Islands',
3986 'UY': 'Uruguay',
3987 'UZ': 'Uzbekistan',
3988 'VU': 'Vanuatu',
3989 'VE': 'Venezuela, Bolivarian Republic of',
3990 'VN': 'Viet Nam',
3991 'VG': 'Virgin Islands, British',
3992 'VI': 'Virgin Islands, U.S.',
3993 'WF': 'Wallis and Futuna',
3994 'EH': 'Western Sahara',
3995 'YE': 'Yemen',
3996 'ZM': 'Zambia',
3997 'ZW': 'Zimbabwe',
3998 # Not ISO 3166 codes, but used for IP blocks
3999 'AP': 'Asia/Pacific Region',
4000 'EU': 'Europe',
4001 }
4002
4003 @classmethod
4004 def short2full(cls, code):
4005 """Convert an ISO 3166-2 country code to the corresponding full name"""
4006 return cls._country_map.get(code.upper())
4007
4008
4009 class GeoUtils:
4010 # Major IPv4 address blocks per country
4011 _country_ip_map = {
4012 'AD': '46.172.224.0/19',
4013 'AE': '94.200.0.0/13',
4014 'AF': '149.54.0.0/17',
4015 'AG': '209.59.64.0/18',
4016 'AI': '204.14.248.0/21',
4017 'AL': '46.99.0.0/16',
4018 'AM': '46.70.0.0/15',
4019 'AO': '105.168.0.0/13',
4020 'AP': '182.50.184.0/21',
4021 'AQ': '23.154.160.0/24',
4022 'AR': '181.0.0.0/12',
4023 'AS': '202.70.112.0/20',
4024 'AT': '77.116.0.0/14',
4025 'AU': '1.128.0.0/11',
4026 'AW': '181.41.0.0/18',
4027 'AX': '185.217.4.0/22',
4028 'AZ': '5.197.0.0/16',
4029 'BA': '31.176.128.0/17',
4030 'BB': '65.48.128.0/17',
4031 'BD': '114.130.0.0/16',
4032 'BE': '57.0.0.0/8',
4033 'BF': '102.178.0.0/15',
4034 'BG': '95.42.0.0/15',
4035 'BH': '37.131.0.0/17',
4036 'BI': '154.117.192.0/18',
4037 'BJ': '137.255.0.0/16',
4038 'BL': '185.212.72.0/23',
4039 'BM': '196.12.64.0/18',
4040 'BN': '156.31.0.0/16',
4041 'BO': '161.56.0.0/16',
4042 'BQ': '161.0.80.0/20',
4043 'BR': '191.128.0.0/12',
4044 'BS': '24.51.64.0/18',
4045 'BT': '119.2.96.0/19',
4046 'BW': '168.167.0.0/16',
4047 'BY': '178.120.0.0/13',
4048 'BZ': '179.42.192.0/18',
4049 'CA': '99.224.0.0/11',
4050 'CD': '41.243.0.0/16',
4051 'CF': '197.242.176.0/21',
4052 'CG': '160.113.0.0/16',
4053 'CH': '85.0.0.0/13',
4054 'CI': '102.136.0.0/14',
4055 'CK': '202.65.32.0/19',
4056 'CL': '152.172.0.0/14',
4057 'CM': '102.244.0.0/14',
4058 'CN': '36.128.0.0/10',
4059 'CO': '181.240.0.0/12',
4060 'CR': '201.192.0.0/12',
4061 'CU': '152.206.0.0/15',
4062 'CV': '165.90.96.0/19',
4063 'CW': '190.88.128.0/17',
4064 'CY': '31.153.0.0/16',
4065 'CZ': '88.100.0.0/14',
4066 'DE': '53.0.0.0/8',
4067 'DJ': '197.241.0.0/17',
4068 'DK': '87.48.0.0/12',
4069 'DM': '192.243.48.0/20',
4070 'DO': '152.166.0.0/15',
4071 'DZ': '41.96.0.0/12',
4072 'EC': '186.68.0.0/15',
4073 'EE': '90.190.0.0/15',
4074 'EG': '156.160.0.0/11',
4075 'ER': '196.200.96.0/20',
4076 'ES': '88.0.0.0/11',
4077 'ET': '196.188.0.0/14',
4078 'EU': '2.16.0.0/13',
4079 'FI': '91.152.0.0/13',
4080 'FJ': '144.120.0.0/16',
4081 'FK': '80.73.208.0/21',
4082 'FM': '119.252.112.0/20',
4083 'FO': '88.85.32.0/19',
4084 'FR': '90.0.0.0/9',
4085 'GA': '41.158.0.0/15',
4086 'GB': '25.0.0.0/8',
4087 'GD': '74.122.88.0/21',
4088 'GE': '31.146.0.0/16',
4089 'GF': '161.22.64.0/18',
4090 'GG': '62.68.160.0/19',
4091 'GH': '154.160.0.0/12',
4092 'GI': '95.164.0.0/16',
4093 'GL': '88.83.0.0/19',
4094 'GM': '160.182.0.0/15',
4095 'GN': '197.149.192.0/18',
4096 'GP': '104.250.0.0/19',
4097 'GQ': '105.235.224.0/20',
4098 'GR': '94.64.0.0/13',
4099 'GT': '168.234.0.0/16',
4100 'GU': '168.123.0.0/16',
4101 'GW': '197.214.80.0/20',
4102 'GY': '181.41.64.0/18',
4103 'HK': '113.252.0.0/14',
4104 'HN': '181.210.0.0/16',
4105 'HR': '93.136.0.0/13',
4106 'HT': '148.102.128.0/17',
4107 'HU': '84.0.0.0/14',
4108 'ID': '39.192.0.0/10',
4109 'IE': '87.32.0.0/12',
4110 'IL': '79.176.0.0/13',
4111 'IM': '5.62.80.0/20',
4112 'IN': '117.192.0.0/10',
4113 'IO': '203.83.48.0/21',
4114 'IQ': '37.236.0.0/14',
4115 'IR': '2.176.0.0/12',
4116 'IS': '82.221.0.0/16',
4117 'IT': '79.0.0.0/10',
4118 'JE': '87.244.64.0/18',
4119 'JM': '72.27.0.0/17',
4120 'JO': '176.29.0.0/16',
4121 'JP': '133.0.0.0/8',
4122 'KE': '105.48.0.0/12',
4123 'KG': '158.181.128.0/17',
4124 'KH': '36.37.128.0/17',
4125 'KI': '103.25.140.0/22',
4126 'KM': '197.255.224.0/20',
4127 'KN': '198.167.192.0/19',
4128 'KP': '175.45.176.0/22',
4129 'KR': '175.192.0.0/10',
4130 'KW': '37.36.0.0/14',
4131 'KY': '64.96.0.0/15',
4132 'KZ': '2.72.0.0/13',
4133 'LA': '115.84.64.0/18',
4134 'LB': '178.135.0.0/16',
4135 'LC': '24.92.144.0/20',
4136 'LI': '82.117.0.0/19',
4137 'LK': '112.134.0.0/15',
4138 'LR': '102.183.0.0/16',
4139 'LS': '129.232.0.0/17',
4140 'LT': '78.56.0.0/13',
4141 'LU': '188.42.0.0/16',
4142 'LV': '46.109.0.0/16',
4143 'LY': '41.252.0.0/14',
4144 'MA': '105.128.0.0/11',
4145 'MC': '88.209.64.0/18',
4146 'MD': '37.246.0.0/16',
4147 'ME': '178.175.0.0/17',
4148 'MF': '74.112.232.0/21',
4149 'MG': '154.126.0.0/17',
4150 'MH': '117.103.88.0/21',
4151 'MK': '77.28.0.0/15',
4152 'ML': '154.118.128.0/18',
4153 'MM': '37.111.0.0/17',
4154 'MN': '49.0.128.0/17',
4155 'MO': '60.246.0.0/16',
4156 'MP': '202.88.64.0/20',
4157 'MQ': '109.203.224.0/19',
4158 'MR': '41.188.64.0/18',
4159 'MS': '208.90.112.0/22',
4160 'MT': '46.11.0.0/16',
4161 'MU': '105.16.0.0/12',
4162 'MV': '27.114.128.0/18',
4163 'MW': '102.70.0.0/15',
4164 'MX': '187.192.0.0/11',
4165 'MY': '175.136.0.0/13',
4166 'MZ': '197.218.0.0/15',
4167 'NA': '41.182.0.0/16',
4168 'NC': '101.101.0.0/18',
4169 'NE': '197.214.0.0/18',
4170 'NF': '203.17.240.0/22',
4171 'NG': '105.112.0.0/12',
4172 'NI': '186.76.0.0/15',
4173 'NL': '145.96.0.0/11',
4174 'NO': '84.208.0.0/13',
4175 'NP': '36.252.0.0/15',
4176 'NR': '203.98.224.0/19',
4177 'NU': '49.156.48.0/22',
4178 'NZ': '49.224.0.0/14',
4179 'OM': '5.36.0.0/15',
4180 'PA': '186.72.0.0/15',
4181 'PE': '186.160.0.0/14',
4182 'PF': '123.50.64.0/18',
4183 'PG': '124.240.192.0/19',
4184 'PH': '49.144.0.0/13',
4185 'PK': '39.32.0.0/11',
4186 'PL': '83.0.0.0/11',
4187 'PM': '70.36.0.0/20',
4188 'PR': '66.50.0.0/16',
4189 'PS': '188.161.0.0/16',
4190 'PT': '85.240.0.0/13',
4191 'PW': '202.124.224.0/20',
4192 'PY': '181.120.0.0/14',
4193 'QA': '37.210.0.0/15',
4194 'RE': '102.35.0.0/16',
4195 'RO': '79.112.0.0/13',
4196 'RS': '93.86.0.0/15',
4197 'RU': '5.136.0.0/13',
4198 'RW': '41.186.0.0/16',
4199 'SA': '188.48.0.0/13',
4200 'SB': '202.1.160.0/19',
4201 'SC': '154.192.0.0/11',
4202 'SD': '102.120.0.0/13',
4203 'SE': '78.64.0.0/12',
4204 'SG': '8.128.0.0/10',
4205 'SI': '188.196.0.0/14',
4206 'SK': '78.98.0.0/15',
4207 'SL': '102.143.0.0/17',
4208 'SM': '89.186.32.0/19',
4209 'SN': '41.82.0.0/15',
4210 'SO': '154.115.192.0/18',
4211 'SR': '186.179.128.0/17',
4212 'SS': '105.235.208.0/21',
4213 'ST': '197.159.160.0/19',
4214 'SV': '168.243.0.0/16',
4215 'SX': '190.102.0.0/20',
4216 'SY': '5.0.0.0/16',
4217 'SZ': '41.84.224.0/19',
4218 'TC': '65.255.48.0/20',
4219 'TD': '154.68.128.0/19',
4220 'TG': '196.168.0.0/14',
4221 'TH': '171.96.0.0/13',
4222 'TJ': '85.9.128.0/18',
4223 'TK': '27.96.24.0/21',
4224 'TL': '180.189.160.0/20',
4225 'TM': '95.85.96.0/19',
4226 'TN': '197.0.0.0/11',
4227 'TO': '175.176.144.0/21',
4228 'TR': '78.160.0.0/11',
4229 'TT': '186.44.0.0/15',
4230 'TV': '202.2.96.0/19',
4231 'TW': '120.96.0.0/11',
4232 'TZ': '156.156.0.0/14',
4233 'UA': '37.52.0.0/14',
4234 'UG': '102.80.0.0/13',
4235 'US': '6.0.0.0/8',
4236 'UY': '167.56.0.0/13',
4237 'UZ': '84.54.64.0/18',
4238 'VA': '212.77.0.0/19',
4239 'VC': '207.191.240.0/21',
4240 'VE': '186.88.0.0/13',
4241 'VG': '66.81.192.0/20',
4242 'VI': '146.226.0.0/16',
4243 'VN': '14.160.0.0/11',
4244 'VU': '202.80.32.0/20',
4245 'WF': '117.20.32.0/21',
4246 'WS': '202.4.32.0/19',
4247 'YE': '134.35.0.0/16',
4248 'YT': '41.242.116.0/22',
4249 'ZA': '41.0.0.0/11',
4250 'ZM': '102.144.0.0/13',
4251 'ZW': '102.177.192.0/18',
4252 }
4253
4254 @classmethod
4255 def random_ipv4(cls, code_or_block):
4256 if len(code_or_block) == 2:
4257 block = cls._country_ip_map.get(code_or_block.upper())
4258 if not block:
4259 return None
4260 else:
4261 block = code_or_block
4262 addr, preflen = block.split('/')
4263 addr_min = struct.unpack('!L', socket.inet_aton(addr))[0]
4264 addr_max = addr_min | (0xffffffff >> int(preflen))
4265 return str(socket.inet_ntoa(
4266 struct.pack('!L', random.randint(addr_min, addr_max))))
4267
4268
4269 # Both long_to_bytes and bytes_to_long are adapted from PyCrypto, which is
4270 # released into Public Domain
4271 # https://github.com/dlitz/pycrypto/blob/master/lib/Crypto/Util/number.py#L387
4272
4273 def long_to_bytes(n, blocksize=0):
4274 """long_to_bytes(n:long, blocksize:int) : string
4275 Convert a long integer to a byte string.
4276
4277 If optional blocksize is given and greater than zero, pad the front of the
4278 byte string with binary zeros so that the length is a multiple of
4279 blocksize.
4280 """
4281 # after much testing, this algorithm was deemed to be the fastest
4282 s = b''
4283 n = int(n)
4284 while n > 0:
4285 s = struct.pack('>I', n & 0xffffffff) + s
4286 n = n >> 32
4287 # strip off leading zeros
4288 for i in range(len(s)):
4289 if s[i] != b'\000'[0]:
4290 break
4291 else:
4292 # only happens when n == 0
4293 s = b'\000'
4294 i = 0
4295 s = s[i:]
4296 # add back some pad bytes. this could be done more efficiently w.r.t. the
4297 # de-padding being done above, but sigh...
4298 if blocksize > 0 and len(s) % blocksize:
4299 s = (blocksize - len(s) % blocksize) * b'\000' + s
4300 return s
4301
4302
4303 def bytes_to_long(s):
4304 """bytes_to_long(string) : long
4305 Convert a byte string to a long integer.
4306
4307 This is (essentially) the inverse of long_to_bytes().
4308 """
4309 acc = 0
4310 length = len(s)
4311 if length % 4:
4312 extra = (4 - length % 4)
4313 s = b'\000' * extra + s
4314 length = length + extra
4315 for i in range(0, length, 4):
4316 acc = (acc << 32) + struct.unpack('>I', s[i:i + 4])[0]
4317 return acc
4318
4319
4320 def ohdave_rsa_encrypt(data, exponent, modulus):
4321 '''
4322 Implement OHDave's RSA algorithm. See http://www.ohdave.com/rsa/
4323
4324 Input:
4325 data: data to encrypt, bytes-like object
4326 exponent, modulus: parameter e and N of RSA algorithm, both integer
4327 Output: hex string of encrypted data
4328
4329 Limitation: supports one block encryption only
4330 '''
4331
4332 payload = int(binascii.hexlify(data[::-1]), 16)
4333 encrypted = pow(payload, exponent, modulus)
4334 return '%x' % encrypted
4335
4336
4337 def pkcs1pad(data, length):
4338 """
4339 Padding input data with PKCS#1 scheme
4340
4341 @param {int[]} data input data
4342 @param {int} length target length
4343 @returns {int[]} padded data
4344 """
4345 if len(data) > length - 11:
4346 raise ValueError('Input data too long for PKCS#1 padding')
4347
4348 pseudo_random = [random.randint(0, 254) for _ in range(length - len(data) - 3)]
4349 return [0, 2] + pseudo_random + [0] + data
4350
4351
4352 def _base_n_table(n, table):
4353 if not table and not n:
4354 raise ValueError('Either table or n must be specified')
4355 table = (table or '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ')[:n]
4356
4357 if n and n != len(table):
4358 raise ValueError(f'base {n} exceeds table length {len(table)}')
4359 return table
4360
4361
4362 def encode_base_n(num, n=None, table=None):
4363 """Convert given int to a base-n string"""
4364 table = _base_n_table(n, table)
4365 if not num:
4366 return table[0]
4367
4368 result, base = '', len(table)
4369 while num:
4370 result = table[num % base] + result
4371 num = num // base
4372 return result
4373
4374
4375 def decode_base_n(string, n=None, table=None):
4376 """Convert given base-n string to int"""
4377 table = {char: index for index, char in enumerate(_base_n_table(n, table))}
4378 result, base = 0, len(table)
4379 for char in string:
4380 result = result * base + table[char]
4381 return result
4382
4383
4384 def decode_packed_codes(code):
4385 mobj = re.search(PACKED_CODES_RE, code)
4386 obfuscated_code, base, count, symbols = mobj.groups()
4387 base = int(base)
4388 count = int(count)
4389 symbols = symbols.split('|')
4390 symbol_table = {}
4391
4392 while count:
4393 count -= 1
4394 base_n_count = encode_base_n(count, base)
4395 symbol_table[base_n_count] = symbols[count] or base_n_count
4396
4397 return re.sub(
4398 r'\b(\w+)\b', lambda mobj: symbol_table[mobj.group(0)],
4399 obfuscated_code)
4400
4401
4402 def caesar(s, alphabet, shift):
4403 if shift == 0:
4404 return s
4405 l = len(alphabet)
4406 return ''.join(
4407 alphabet[(alphabet.index(c) + shift) % l] if c in alphabet else c
4408 for c in s)
4409
4410
4411 def rot47(s):
4412 return caesar(s, r'''!"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\]^_`abcdefghijklmnopqrstuvwxyz{|}~''', 47)
4413
4414
4415 def parse_m3u8_attributes(attrib):
4416 info = {}
4417 for (key, val) in re.findall(r'(?P<key>[A-Z0-9-]+)=(?P<val>"[^"]+"|[^",]+)(?:,|$)', attrib):
4418 if val.startswith('"'):
4419 val = val[1:-1]
4420 info[key] = val
4421 return info
4422
4423
4424 def urshift(val, n):
4425 return val >> n if val >= 0 else (val + 0x100000000) >> n
4426
4427
4428 def write_xattr(path, key, value):
4429 # Windows: Write xattrs to NTFS Alternate Data Streams:
4430 # http://en.wikipedia.org/wiki/NTFS#Alternate_data_streams_.28ADS.29
4431 if compat_os_name == 'nt':
4432 assert ':' not in key
4433 assert os.path.exists(path)
4434
4435 try:
4436 with open(f'{path}:{key}', 'wb') as f:
4437 f.write(value)
4438 except OSError as e:
4439 raise XAttrMetadataError(e.errno, e.strerror)
4440 return
4441
4442 # UNIX Method 1. Use os.setxattr/xattrs/pyxattrs modules
4443
4444 setxattr = None
4445 if callable(getattr(os, 'setxattr', None)):
4446 setxattr = os.setxattr
4447 elif getattr(xattr, '_yt_dlp__identifier', None) == 'pyxattr':
4448 # Unicode arguments are not supported in pyxattr until version 0.5.0
4449 # See https://github.com/ytdl-org/youtube-dl/issues/5498
4450 if version_tuple(xattr.__version__) >= (0, 5, 0):
4451 setxattr = xattr.set
4452 elif xattr:
4453 setxattr = xattr.setxattr
4454
4455 if setxattr:
4456 try:
4457 setxattr(path, key, value)
4458 except OSError as e:
4459 raise XAttrMetadataError(e.errno, e.strerror)
4460 return
4461
4462 # UNIX Method 2. Use setfattr/xattr executables
4463 exe = ('setfattr' if check_executable('setfattr', ['--version'])
4464 else 'xattr' if check_executable('xattr', ['-h']) else None)
4465 if not exe:
4466 raise XAttrUnavailableError(
4467 'Couldn\'t find a tool to set the xattrs. Install either the python "xattr" or "pyxattr" modules or the '
4468 + ('"xattr" binary' if sys.platform != 'linux' else 'GNU "attr" package (which contains the "setfattr" tool)'))
4469
4470 value = value.decode()
4471 try:
4472 _, stderr, returncode = Popen.run(
4473 [exe, '-w', key, value, path] if exe == 'xattr' else [exe, '-n', key, '-v', value, path],
4474 text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)
4475 except OSError as e:
4476 raise XAttrMetadataError(e.errno, e.strerror)
4477 if returncode:
4478 raise XAttrMetadataError(returncode, stderr)
4479
4480
4481 def random_birthday(year_field, month_field, day_field):
4482 start_date = datetime.date(1950, 1, 1)
4483 end_date = datetime.date(1995, 12, 31)
4484 offset = random.randint(0, (end_date - start_date).days)
4485 random_date = start_date + datetime.timedelta(offset)
4486 return {
4487 year_field: str(random_date.year),
4488 month_field: str(random_date.month),
4489 day_field: str(random_date.day),
4490 }
4491
4492
4493 def find_available_port(interface=''):
4494 try:
4495 with socket.socket() as sock:
4496 sock.bind((interface, 0))
4497 return sock.getsockname()[1]
4498 except OSError:
4499 return None
4500
4501
4502 # Templates for internet shortcut files, which are plain text files.
4503 DOT_URL_LINK_TEMPLATE = '''\
4504 [InternetShortcut]
4505 URL=%(url)s
4506 '''
4507
4508 DOT_WEBLOC_LINK_TEMPLATE = '''\
4509 <?xml version="1.0" encoding="UTF-8"?>
4510 <!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
4511 <plist version="1.0">
4512 <dict>
4513 \t<key>URL</key>
4514 \t<string>%(url)s</string>
4515 </dict>
4516 </plist>
4517 '''
4518
4519 DOT_DESKTOP_LINK_TEMPLATE = '''\
4520 [Desktop Entry]
4521 Encoding=UTF-8
4522 Name=%(filename)s
4523 Type=Link
4524 URL=%(url)s
4525 Icon=text-html
4526 '''
4527
4528 LINK_TEMPLATES = {
4529 'url': DOT_URL_LINK_TEMPLATE,
4530 'desktop': DOT_DESKTOP_LINK_TEMPLATE,
4531 'webloc': DOT_WEBLOC_LINK_TEMPLATE,
4532 }
4533
4534
4535 def iri_to_uri(iri):
4536 """
4537 Converts an IRI (Internationalized Resource Identifier, allowing Unicode characters) to a URI (Uniform Resource Identifier, ASCII-only).
4538
4539 The function doesn't add an additional layer of escaping; e.g., it doesn't escape `%3C` as `%253C`. Instead, it percent-escapes characters with an underlying UTF-8 encoding *besides* those already escaped, leaving the URI intact.
4540 """
4541
4542 iri_parts = urllib.parse.urlparse(iri)
4543
4544 if '[' in iri_parts.netloc:
4545 raise ValueError('IPv6 URIs are not, yet, supported.')
4546 # Querying `.netloc`, when there's only one bracket, also raises a ValueError.
4547
4548 # The `safe` argument values, that the following code uses, contain the characters that should not be percent-encoded. Everything else but letters, digits and '_.-' will be percent-encoded with an underlying UTF-8 encoding. Everything already percent-encoded will be left as is.
4549
4550 net_location = ''
4551 if iri_parts.username:
4552 net_location += urllib.parse.quote(iri_parts.username, safe=r"!$%&'()*+,~")
4553 if iri_parts.password is not None:
4554 net_location += ':' + urllib.parse.quote(iri_parts.password, safe=r"!$%&'()*+,~")
4555 net_location += '@'
4556
4557 net_location += iri_parts.hostname.encode('idna').decode() # Punycode for Unicode hostnames.
4558 # The 'idna' encoding produces ASCII text.
4559 if iri_parts.port is not None and iri_parts.port != 80:
4560 net_location += ':' + str(iri_parts.port)
4561
4562 return urllib.parse.urlunparse(
4563 (iri_parts.scheme,
4564 net_location,
4565
4566 urllib.parse.quote_plus(iri_parts.path, safe=r"!$%&'()*+,/:;=@|~"),
4567
4568 # Unsure about the `safe` argument, since this is a legacy way of handling parameters.
4569 urllib.parse.quote_plus(iri_parts.params, safe=r"!$%&'()*+,/:;=@|~"),
4570
4571 # Not totally sure about the `safe` argument, since the source does not explicitly mention the query URI component.
4572 urllib.parse.quote_plus(iri_parts.query, safe=r"!$%&'()*+,/:;=?@{|}~"),
4573
4574 urllib.parse.quote_plus(iri_parts.fragment, safe=r"!#$%&'()*+,/:;=?@{|}~")))
4575
4576 # Source for `safe` arguments: https://url.spec.whatwg.org/#percent-encoded-bytes.
4577
4578
4579 def to_high_limit_path(path):
4580 if sys.platform in ['win32', 'cygwin']:
4581 # Work around MAX_PATH limitation on Windows. The maximum allowed length for the individual path segments may still be quite limited.
4582 return '\\\\?\\' + os.path.abspath(path)
4583
4584 return path
4585
4586
4587 def format_field(obj, field=None, template='%s', ignore=NO_DEFAULT, default='', func=IDENTITY):
4588 val = traversal.traverse_obj(obj, *variadic(field))
4589 if not val if ignore is NO_DEFAULT else val in variadic(ignore):
4590 return default
4591 return template % func(val)
4592
4593
4594 def clean_podcast_url(url):
4595 url = re.sub(r'''(?x)
4596 (?:
4597 (?:
4598 chtbl\.com/track|
4599 media\.blubrry\.com| # https://create.blubrry.com/resources/podcast-media-download-statistics/getting-started/
4600 play\.podtrac\.com|
4601 chrt\.fm/track|
4602 mgln\.ai/e
4603 )(?:/[^/.]+)?|
4604 (?:dts|www)\.podtrac\.com/(?:pts/)?redirect\.[0-9a-z]{3,4}| # http://analytics.podtrac.com/how-to-measure
4605 flex\.acast\.com|
4606 pd(?:
4607 cn\.co| # https://podcorn.com/analytics-prefix/
4608 st\.fm # https://podsights.com/docs/
4609 )/e|
4610 [0-9]\.gum\.fm|
4611 pscrb\.fm/rss/p
4612 )/''', '', url)
4613 return re.sub(r'^\w+://(\w+://)', r'\1', url)
4614
4615
4616 _HEX_TABLE = '0123456789abcdef'
4617
4618
4619 def random_uuidv4():
4620 return re.sub(r'[xy]', lambda x: _HEX_TABLE[random.randint(0, 15)], 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx')
4621
4622
4623 def make_dir(path, to_screen=None):
4624 try:
4625 dn = os.path.dirname(path)
4626 if dn:
4627 os.makedirs(dn, exist_ok=True)
4628 return True
4629 except OSError as err:
4630 if callable(to_screen) is not None:
4631 to_screen(f'unable to create directory {err}')
4632 return False
4633
4634
4635 def get_executable_path():
4636 from ..update import _get_variant_and_executable_path
4637
4638 return os.path.dirname(os.path.abspath(_get_variant_and_executable_path()[1]))
4639
4640
4641 def get_user_config_dirs(package_name):
4642 # .config (e.g. ~/.config/package_name)
4643 xdg_config_home = os.getenv('XDG_CONFIG_HOME') or compat_expanduser('~/.config')
4644 yield os.path.join(xdg_config_home, package_name)
4645
4646 # appdata (%APPDATA%/package_name)
4647 appdata_dir = os.getenv('appdata')
4648 if appdata_dir:
4649 yield os.path.join(appdata_dir, package_name)
4650
4651 # home (~/.package_name)
4652 yield os.path.join(compat_expanduser('~'), f'.{package_name}')
4653
4654
4655 def get_system_config_dirs(package_name):
4656 # /etc/package_name
4657 yield os.path.join('/etc', package_name)
4658
4659
4660 def time_seconds(**kwargs):
4661 """
4662 Returns TZ-aware time in seconds since the epoch (1970-01-01T00:00:00Z)
4663 """
4664 return time.time() + datetime.timedelta(**kwargs).total_seconds()
4665
4666
4667 # create a JSON Web Signature (jws) with HS256 algorithm
4668 # the resulting format is in JWS Compact Serialization
4669 # implemented following JWT https://www.rfc-editor.org/rfc/rfc7519.html
4670 # implemented following JWS https://www.rfc-editor.org/rfc/rfc7515.html
4671 def jwt_encode_hs256(payload_data, key, headers={}):
4672 header_data = {
4673 'alg': 'HS256',
4674 'typ': 'JWT',
4675 }
4676 if headers:
4677 header_data.update(headers)
4678 header_b64 = base64.b64encode(json.dumps(header_data).encode())
4679 payload_b64 = base64.b64encode(json.dumps(payload_data).encode())
4680 h = hmac.new(key.encode(), header_b64 + b'.' + payload_b64, hashlib.sha256)
4681 signature_b64 = base64.b64encode(h.digest())
4682 token = header_b64 + b'.' + payload_b64 + b'.' + signature_b64
4683 return token
4684
4685
4686 # can be extended in future to verify the signature and parse header and return the algorithm used if it's not HS256
4687 def jwt_decode_hs256(jwt):
4688 header_b64, payload_b64, signature_b64 = jwt.split('.')
4689 # add trailing ='s that may have been stripped, superfluous ='s are ignored
4690 payload_data = json.loads(base64.urlsafe_b64decode(f'{payload_b64}==='))
4691 return payload_data
4692
4693
4694 WINDOWS_VT_MODE = False if compat_os_name == 'nt' else None
4695
4696
4697 @functools.cache
4698 def supports_terminal_sequences(stream):
4699 if compat_os_name == 'nt':
4700 if not WINDOWS_VT_MODE:
4701 return False
4702 elif not os.getenv('TERM'):
4703 return False
4704 try:
4705 return stream.isatty()
4706 except BaseException:
4707 return False
4708
4709
4710 def windows_enable_vt_mode():
4711 """Ref: https://bugs.python.org/issue30075 """
4712 if get_windows_version() < (10, 0, 10586):
4713 return
4714
4715 import ctypes
4716 import ctypes.wintypes
4717 import msvcrt
4718
4719 ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x0004
4720
4721 dll = ctypes.WinDLL('kernel32', use_last_error=False)
4722 handle = os.open('CONOUT$', os.O_RDWR)
4723 try:
4724 h_out = ctypes.wintypes.HANDLE(msvcrt.get_osfhandle(handle))
4725 dw_original_mode = ctypes.wintypes.DWORD()
4726 success = dll.GetConsoleMode(h_out, ctypes.byref(dw_original_mode))
4727 if not success:
4728 raise Exception('GetConsoleMode failed')
4729
4730 success = dll.SetConsoleMode(h_out, ctypes.wintypes.DWORD(
4731 dw_original_mode.value | ENABLE_VIRTUAL_TERMINAL_PROCESSING))
4732 if not success:
4733 raise Exception('SetConsoleMode failed')
4734 finally:
4735 os.close(handle)
4736
4737 global WINDOWS_VT_MODE
4738 WINDOWS_VT_MODE = True
4739 supports_terminal_sequences.cache_clear()
4740
4741
4742 _terminal_sequences_re = re.compile('\033\\[[^m]+m')
4743
4744
4745 def remove_terminal_sequences(string):
4746 return _terminal_sequences_re.sub('', string)
4747
4748
4749 def number_of_digits(number):
4750 return len('%d' % number)
4751
4752
4753 def join_nonempty(*values, delim='-', from_dict=None):
4754 if from_dict is not None:
4755 values = (traversal.traverse_obj(from_dict, variadic(v)) for v in values)
4756 return delim.join(map(str, filter(None, values)))
4757
4758
4759 def scale_thumbnails_to_max_format_width(formats, thumbnails, url_width_re):
4760 """
4761 Find the largest format dimensions in terms of video width and, for each thumbnail:
4762 * Modify the URL: Match the width with the provided regex and replace with the former width
4763 * Update dimensions
4764
4765 This function is useful with video services that scale the provided thumbnails on demand
4766 """
4767 _keys = ('width', 'height')
4768 max_dimensions = max(
4769 (tuple(format.get(k) or 0 for k in _keys) for format in formats),
4770 default=(0, 0))
4771 if not max_dimensions[0]:
4772 return thumbnails
4773 return [
4774 merge_dicts(
4775 {'url': re.sub(url_width_re, str(max_dimensions[0]), thumbnail['url'])},
4776 dict(zip(_keys, max_dimensions)), thumbnail)
4777 for thumbnail in thumbnails
4778 ]
4779
4780
4781 def parse_http_range(range):
4782 """ Parse value of "Range" or "Content-Range" HTTP header into tuple. """
4783 if not range:
4784 return None, None, None
4785 crg = re.search(r'bytes[ =](\d+)-(\d+)?(?:/(\d+))?', range)
4786 if not crg:
4787 return None, None, None
4788 return int(crg.group(1)), int_or_none(crg.group(2)), int_or_none(crg.group(3))
4789
4790
4791 def read_stdin(what):
4792 eof = 'Ctrl+Z' if compat_os_name == 'nt' else 'Ctrl+D'
4793 write_string(f'Reading {what} from STDIN - EOF ({eof}) to end:\n')
4794 return sys.stdin
4795
4796
4797 def determine_file_encoding(data):
4798 """
4799 Detect the text encoding used
4800 @returns (encoding, bytes to skip)
4801 """
4802
4803 # BOM marks are given priority over declarations
4804 for bom, enc in BOMS:
4805 if data.startswith(bom):
4806 return enc, len(bom)
4807
4808 # Strip off all null bytes to match even when UTF-16 or UTF-32 is used.
4809 # We ignore the endianness to get a good enough match
4810 data = data.replace(b'\0', b'')
4811 mobj = re.match(rb'(?m)^#\s*coding\s*:\s*(\S+)\s*$', data)
4812 return mobj.group(1).decode() if mobj else None, 0
4813
4814
4815 class Config:
4816 own_args = None
4817 parsed_args = None
4818 filename = None
4819 __initialized = False
4820
4821 def __init__(self, parser, label=None):
4822 self.parser, self.label = parser, label
4823 self._loaded_paths, self.configs = set(), []
4824
4825 def init(self, args=None, filename=None):
4826 assert not self.__initialized
4827 self.own_args, self.filename = args, filename
4828 return self.load_configs()
4829
4830 def load_configs(self):
4831 directory = ''
4832 if self.filename:
4833 location = os.path.realpath(self.filename)
4834 directory = os.path.dirname(location)
4835 if location in self._loaded_paths:
4836 return False
4837 self._loaded_paths.add(location)
4838
4839 self.__initialized = True
4840 opts, _ = self.parser.parse_known_args(self.own_args)
4841 self.parsed_args = self.own_args
4842 for location in opts.config_locations or []:
4843 if location == '-':
4844 if location in self._loaded_paths:
4845 continue
4846 self._loaded_paths.add(location)
4847 self.append_config(shlex.split(read_stdin('options'), comments=True), label='stdin')
4848 continue
4849 location = os.path.join(directory, expand_path(location))
4850 if os.path.isdir(location):
4851 location = os.path.join(location, 'yt-dlp.conf')
4852 if not os.path.exists(location):
4853 self.parser.error(f'config location {location} does not exist')
4854 self.append_config(self.read_file(location), location)
4855 return True
4856
4857 def __str__(self):
4858 label = join_nonempty(
4859 self.label, 'config', f'"{self.filename}"' if self.filename else '',
4860 delim=' ')
4861 return join_nonempty(
4862 self.own_args is not None and f'{label[0].upper()}{label[1:]}: {self.hide_login_info(self.own_args)}',
4863 *(f'\n{c}'.replace('\n', '\n| ')[1:] for c in self.configs),
4864 delim='\n')
4865
4866 @staticmethod
4867 def read_file(filename, default=[]):
4868 try:
4869 optionf = open(filename, 'rb')
4870 except OSError:
4871 return default # silently skip if file is not present
4872 try:
4873 enc, skip = determine_file_encoding(optionf.read(512))
4874 optionf.seek(skip, io.SEEK_SET)
4875 except OSError:
4876 enc = None # silently skip read errors
4877 try:
4878 # FIXME: https://github.com/ytdl-org/youtube-dl/commit/dfe5fa49aed02cf36ba9f743b11b0903554b5e56
4879 contents = optionf.read().decode(enc or preferredencoding())
4880 res = shlex.split(contents, comments=True)
4881 except Exception as err:
4882 raise ValueError(f'Unable to parse "{filename}": {err}')
4883 finally:
4884 optionf.close()
4885 return res
4886
4887 @staticmethod
4888 def hide_login_info(opts):
4889 PRIVATE_OPTS = {'-p', '--password', '-u', '--username', '--video-password', '--ap-password', '--ap-username'}
4890 eqre = re.compile('^(?P<key>' + ('|'.join(re.escape(po) for po in PRIVATE_OPTS)) + ')=.+$')
4891
4892 def _scrub_eq(o):
4893 m = eqre.match(o)
4894 if m:
4895 return m.group('key') + '=PRIVATE'
4896 else:
4897 return o
4898
4899 opts = list(map(_scrub_eq, opts))
4900 for idx, opt in enumerate(opts):
4901 if opt in PRIVATE_OPTS and idx + 1 < len(opts):
4902 opts[idx + 1] = 'PRIVATE'
4903 return opts
4904
4905 def append_config(self, *args, label=None):
4906 config = type(self)(self.parser, label)
4907 config._loaded_paths = self._loaded_paths
4908 if config.init(*args):
4909 self.configs.append(config)
4910
4911 @property
4912 def all_args(self):
4913 for config in reversed(self.configs):
4914 yield from config.all_args
4915 yield from self.parsed_args or []
4916
4917 def parse_known_args(self, **kwargs):
4918 return self.parser.parse_known_args(self.all_args, **kwargs)
4919
4920 def parse_args(self):
4921 return self.parser.parse_args(self.all_args)
4922
4923
4924 def merge_headers(*dicts):
4925 """Merge dicts of http headers case insensitively, prioritizing the latter ones"""
4926 return {k.title(): v for k, v in itertools.chain.from_iterable(map(dict.items, dicts))}
4927
4928
4929 def cached_method(f):
4930 """Cache a method"""
4931 signature = inspect.signature(f)
4932
4933 @functools.wraps(f)
4934 def wrapper(self, *args, **kwargs):
4935 bound_args = signature.bind(self, *args, **kwargs)
4936 bound_args.apply_defaults()
4937 key = tuple(bound_args.arguments.values())[1:]
4938
4939 cache = vars(self).setdefault('_cached_method__cache', {}).setdefault(f.__name__, {})
4940 if key not in cache:
4941 cache[key] = f(self, *args, **kwargs)
4942 return cache[key]
4943 return wrapper
4944
4945
4946 class classproperty:
4947 """property access for class methods with optional caching"""
4948 def __new__(cls, func=None, *args, **kwargs):
4949 if not func:
4950 return functools.partial(cls, *args, **kwargs)
4951 return super().__new__(cls)
4952
4953 def __init__(self, func, *, cache=False):
4954 functools.update_wrapper(self, func)
4955 self.func = func
4956 self._cache = {} if cache else None
4957
4958 def __get__(self, _, cls):
4959 if self._cache is None:
4960 return self.func(cls)
4961 elif cls not in self._cache:
4962 self._cache[cls] = self.func(cls)
4963 return self._cache[cls]
4964
4965
4966 class function_with_repr:
4967 def __init__(self, func, repr_=None):
4968 functools.update_wrapper(self, func)
4969 self.func, self.__repr = func, repr_
4970
4971 def __call__(self, *args, **kwargs):
4972 return self.func(*args, **kwargs)
4973
4974 def __repr__(self):
4975 if self.__repr:
4976 return self.__repr
4977 return f'{self.func.__module__}.{self.func.__qualname__}'
4978
4979
4980 class Namespace(types.SimpleNamespace):
4981 """Immutable namespace"""
4982
4983 def __iter__(self):
4984 return iter(self.__dict__.values())
4985
4986 @property
4987 def items_(self):
4988 return self.__dict__.items()
4989
4990
4991 MEDIA_EXTENSIONS = Namespace(
4992 common_video=('avi', 'flv', 'mkv', 'mov', 'mp4', 'webm'),
4993 video=('3g2', '3gp', 'f4v', 'mk3d', 'divx', 'mpg', 'ogv', 'm4v', 'wmv'),
4994 common_audio=('aiff', 'alac', 'flac', 'm4a', 'mka', 'mp3', 'ogg', 'opus', 'wav'),
4995 audio=('aac', 'ape', 'asf', 'f4a', 'f4b', 'm4b', 'm4p', 'm4r', 'oga', 'ogx', 'spx', 'vorbis', 'wma', 'weba'),
4996 thumbnails=('jpg', 'png', 'webp'),
4997 storyboards=('mhtml', ),
4998 subtitles=('srt', 'vtt', 'ass', 'lrc'),
4999 manifests=('f4f', 'f4m', 'm3u8', 'smil', 'mpd'),
5000 )
5001 MEDIA_EXTENSIONS.video += MEDIA_EXTENSIONS.common_video
5002 MEDIA_EXTENSIONS.audio += MEDIA_EXTENSIONS.common_audio
5003
5004 KNOWN_EXTENSIONS = (*MEDIA_EXTENSIONS.video, *MEDIA_EXTENSIONS.audio, *MEDIA_EXTENSIONS.manifests)
5005
5006
5007 class RetryManager:
5008 """Usage:
5009 for retry in RetryManager(...):
5010 try:
5011 ...
5012 except SomeException as err:
5013 retry.error = err
5014 continue
5015 """
5016 attempt, _error = 0, None
5017
5018 def __init__(self, _retries, _error_callback, **kwargs):
5019 self.retries = _retries or 0
5020 self.error_callback = functools.partial(_error_callback, **kwargs)
5021
5022 def _should_retry(self):
5023 return self._error is not NO_DEFAULT and self.attempt <= self.retries
5024
5025 @property
5026 def error(self):
5027 if self._error is NO_DEFAULT:
5028 return None
5029 return self._error
5030
5031 @error.setter
5032 def error(self, value):
5033 self._error = value
5034
5035 def __iter__(self):
5036 while self._should_retry():
5037 self.error = NO_DEFAULT
5038 self.attempt += 1
5039 yield self
5040 if self.error:
5041 self.error_callback(self.error, self.attempt, self.retries)
5042
5043 @staticmethod
5044 def report_retry(e, count, retries, *, sleep_func, info, warn, error=None, suffix=None):
5045 """Utility function for reporting retries"""
5046 if count > retries:
5047 if error:
5048 return error(f'{e}. Giving up after {count - 1} retries') if count > 1 else error(str(e))
5049 raise e
5050
5051 if not count:
5052 return warn(e)
5053 elif isinstance(e, ExtractorError):
5054 e = remove_end(str_or_none(e.cause) or e.orig_msg, '.')
5055 warn(f'{e}. Retrying{format_field(suffix, None, " %s")} ({count}/{retries})...')
5056
5057 delay = float_or_none(sleep_func(n=count - 1)) if callable(sleep_func) else sleep_func
5058 if delay:
5059 info(f'Sleeping {delay:.2f} seconds ...')
5060 time.sleep(delay)
5061
5062
5063 def make_archive_id(ie, video_id):
5064 ie_key = ie if isinstance(ie, str) else ie.ie_key()
5065 return f'{ie_key.lower()} {video_id}'
5066
5067
5068 def truncate_string(s, left, right=0):
5069 assert left > 3 and right >= 0
5070 if s is None or len(s) <= left + right:
5071 return s
5072 return f'{s[:left-3]}...{s[-right:] if right else ""}'
5073
5074
5075 def orderedSet_from_options(options, alias_dict, *, use_regex=False, start=None):
5076 assert 'all' in alias_dict, '"all" alias is required'
5077 requested = list(start or [])
5078 for val in options:
5079 discard = val.startswith('-')
5080 if discard:
5081 val = val[1:]
5082
5083 if val in alias_dict:
5084 val = alias_dict[val] if not discard else [
5085 i[1:] if i.startswith('-') else f'-{i}' for i in alias_dict[val]]
5086 # NB: Do not allow regex in aliases for performance
5087 requested = orderedSet_from_options(val, alias_dict, start=requested)
5088 continue
5089
5090 current = (filter(re.compile(val, re.I).fullmatch, alias_dict['all']) if use_regex
5091 else [val] if val in alias_dict['all'] else None)
5092 if current is None:
5093 raise ValueError(val)
5094
5095 if discard:
5096 for item in current:
5097 while item in requested:
5098 requested.remove(item)
5099 else:
5100 requested.extend(current)
5101
5102 return orderedSet(requested)
5103
5104
5105 # TODO: Rewrite
5106 class FormatSorter:
5107 regex = r' *((?P<reverse>\+)?(?P<field>[a-zA-Z0-9_]+)((?P<separator>[~:])(?P<limit>.*?))?)? *$'
5108
5109 default = ('hidden', 'aud_or_vid', 'hasvid', 'ie_pref', 'lang', 'quality',
5110 'res', 'fps', 'hdr:12', 'vcodec:vp9.2', 'channels', 'acodec',
5111 'size', 'br', 'asr', 'proto', 'ext', 'hasaud', 'source', 'id') # These must not be aliases
5112 ytdl_default = ('hasaud', 'lang', 'quality', 'tbr', 'filesize', 'vbr',
5113 'height', 'width', 'proto', 'vext', 'abr', 'aext',
5114 'fps', 'fs_approx', 'source', 'id')
5115
5116 settings = {
5117 'vcodec': {'type': 'ordered', 'regex': True,
5118 'order': ['av0?1', 'vp0?9.2', 'vp0?9', '[hx]265|he?vc?', '[hx]264|avc', 'vp0?8', 'mp4v|h263', 'theora', '', None, 'none']},
5119 'acodec': {'type': 'ordered', 'regex': True,
5120 'order': ['[af]lac', 'wav|aiff', 'opus', 'vorbis|ogg', 'aac', 'mp?4a?', 'mp3', 'ac-?4', 'e-?a?c-?3', 'ac-?3', 'dts', '', None, 'none']},
5121 'hdr': {'type': 'ordered', 'regex': True, 'field': 'dynamic_range',
5122 'order': ['dv', '(hdr)?12', r'(hdr)?10\+', '(hdr)?10', 'hlg', '', 'sdr', None]},
5123 'proto': {'type': 'ordered', 'regex': True, 'field': 'protocol',
5124 'order': ['(ht|f)tps', '(ht|f)tp$', 'm3u8.*', '.*dash', 'websocket_frag', 'rtmpe?', '', 'mms|rtsp', 'ws|websocket', 'f4']},
5125 'vext': {'type': 'ordered', 'field': 'video_ext',
5126 'order': ('mp4', 'mov', 'webm', 'flv', '', 'none'),
5127 'order_free': ('webm', 'mp4', 'mov', 'flv', '', 'none')},
5128 'aext': {'type': 'ordered', 'regex': True, 'field': 'audio_ext',
5129 'order': ('m4a', 'aac', 'mp3', 'ogg', 'opus', 'web[am]', '', 'none'),
5130 'order_free': ('ogg', 'opus', 'web[am]', 'mp3', 'm4a', 'aac', '', 'none')},
5131 'hidden': {'visible': False, 'forced': True, 'type': 'extractor', 'max': -1000},
5132 'aud_or_vid': {'visible': False, 'forced': True, 'type': 'multiple',
5133 'field': ('vcodec', 'acodec'),
5134 'function': lambda it: int(any(v != 'none' for v in it))},
5135 'ie_pref': {'priority': True, 'type': 'extractor'},
5136 'hasvid': {'priority': True, 'field': 'vcodec', 'type': 'boolean', 'not_in_list': ('none',)},
5137 'hasaud': {'field': 'acodec', 'type': 'boolean', 'not_in_list': ('none',)},
5138 'lang': {'convert': 'float', 'field': 'language_preference', 'default': -1},
5139 'quality': {'convert': 'float', 'default': -1},
5140 'filesize': {'convert': 'bytes'},
5141 'fs_approx': {'convert': 'bytes', 'field': 'filesize_approx'},
5142 'id': {'convert': 'string', 'field': 'format_id'},
5143 'height': {'convert': 'float_none'},
5144 'width': {'convert': 'float_none'},
5145 'fps': {'convert': 'float_none'},
5146 'channels': {'convert': 'float_none', 'field': 'audio_channels'},
5147 'tbr': {'convert': 'float_none'},
5148 'vbr': {'convert': 'float_none'},
5149 'abr': {'convert': 'float_none'},
5150 'asr': {'convert': 'float_none'},
5151 'source': {'convert': 'float', 'field': 'source_preference', 'default': -1},
5152
5153 'codec': {'type': 'combined', 'field': ('vcodec', 'acodec')},
5154 'br': {'type': 'multiple', 'field': ('tbr', 'vbr', 'abr'), 'convert': 'float_none',
5155 'function': lambda it: next(filter(None, it), None)},
5156 'size': {'type': 'multiple', 'field': ('filesize', 'fs_approx'), 'convert': 'bytes',
5157 'function': lambda it: next(filter(None, it), None)},
5158 'ext': {'type': 'combined', 'field': ('vext', 'aext')},
5159 'res': {'type': 'multiple', 'field': ('height', 'width'),
5160 'function': lambda it: (lambda l: min(l) if l else 0)(tuple(filter(None, it)))},
5161
5162 # Actual field names
5163 'format_id': {'type': 'alias', 'field': 'id'},
5164 'preference': {'type': 'alias', 'field': 'ie_pref'},
5165 'language_preference': {'type': 'alias', 'field': 'lang'},
5166 'source_preference': {'type': 'alias', 'field': 'source'},
5167 'protocol': {'type': 'alias', 'field': 'proto'},
5168 'filesize_approx': {'type': 'alias', 'field': 'fs_approx'},
5169 'audio_channels': {'type': 'alias', 'field': 'channels'},
5170
5171 # Deprecated
5172 'dimension': {'type': 'alias', 'field': 'res', 'deprecated': True},
5173 'resolution': {'type': 'alias', 'field': 'res', 'deprecated': True},
5174 'extension': {'type': 'alias', 'field': 'ext', 'deprecated': True},
5175 'bitrate': {'type': 'alias', 'field': 'br', 'deprecated': True},
5176 'total_bitrate': {'type': 'alias', 'field': 'tbr', 'deprecated': True},
5177 'video_bitrate': {'type': 'alias', 'field': 'vbr', 'deprecated': True},
5178 'audio_bitrate': {'type': 'alias', 'field': 'abr', 'deprecated': True},
5179 'framerate': {'type': 'alias', 'field': 'fps', 'deprecated': True},
5180 'filesize_estimate': {'type': 'alias', 'field': 'size', 'deprecated': True},
5181 'samplerate': {'type': 'alias', 'field': 'asr', 'deprecated': True},
5182 'video_ext': {'type': 'alias', 'field': 'vext', 'deprecated': True},
5183 'audio_ext': {'type': 'alias', 'field': 'aext', 'deprecated': True},
5184 'video_codec': {'type': 'alias', 'field': 'vcodec', 'deprecated': True},
5185 'audio_codec': {'type': 'alias', 'field': 'acodec', 'deprecated': True},
5186 'video': {'type': 'alias', 'field': 'hasvid', 'deprecated': True},
5187 'has_video': {'type': 'alias', 'field': 'hasvid', 'deprecated': True},
5188 'audio': {'type': 'alias', 'field': 'hasaud', 'deprecated': True},
5189 'has_audio': {'type': 'alias', 'field': 'hasaud', 'deprecated': True},
5190 'extractor': {'type': 'alias', 'field': 'ie_pref', 'deprecated': True},
5191 'extractor_preference': {'type': 'alias', 'field': 'ie_pref', 'deprecated': True},
5192 }
5193
5194 def __init__(self, ydl, field_preference):
5195 self.ydl = ydl
5196 self._order = []
5197 self.evaluate_params(self.ydl.params, field_preference)
5198 if ydl.params.get('verbose'):
5199 self.print_verbose_info(self.ydl.write_debug)
5200
5201 def _get_field_setting(self, field, key):
5202 if field not in self.settings:
5203 if key in ('forced', 'priority'):
5204 return False
5205 self.ydl.deprecated_feature(f'Using arbitrary fields ({field}) for format sorting is '
5206 'deprecated and may be removed in a future version')
5207 self.settings[field] = {}
5208 propObj = self.settings[field]
5209 if key not in propObj:
5210 type = propObj.get('type')
5211 if key == 'field':
5212 default = 'preference' if type == 'extractor' else (field,) if type in ('combined', 'multiple') else field
5213 elif key == 'convert':
5214 default = 'order' if type == 'ordered' else 'float_string' if field else 'ignore'
5215 else:
5216 default = {'type': 'field', 'visible': True, 'order': [], 'not_in_list': (None,)}.get(key, None)
5217 propObj[key] = default
5218 return propObj[key]
5219
5220 def _resolve_field_value(self, field, value, convertNone=False):
5221 if value is None:
5222 if not convertNone:
5223 return None
5224 else:
5225 value = value.lower()
5226 conversion = self._get_field_setting(field, 'convert')
5227 if conversion == 'ignore':
5228 return None
5229 if conversion == 'string':
5230 return value
5231 elif conversion == 'float_none':
5232 return float_or_none(value)
5233 elif conversion == 'bytes':
5234 return parse_bytes(value)
5235 elif conversion == 'order':
5236 order_list = (self._use_free_order and self._get_field_setting(field, 'order_free')) or self._get_field_setting(field, 'order')
5237 use_regex = self._get_field_setting(field, 'regex')
5238 list_length = len(order_list)
5239 empty_pos = order_list.index('') if '' in order_list else list_length + 1
5240 if use_regex and value is not None:
5241 for i, regex in enumerate(order_list):
5242 if regex and re.match(regex, value):
5243 return list_length - i
5244 return list_length - empty_pos # not in list
5245 else: # not regex or value = None
5246 return list_length - (order_list.index(value) if value in order_list else empty_pos)
5247 else:
5248 if value.isnumeric():
5249 return float(value)
5250 else:
5251 self.settings[field]['convert'] = 'string'
5252 return value
5253
5254 def evaluate_params(self, params, sort_extractor):
5255 self._use_free_order = params.get('prefer_free_formats', False)
5256 self._sort_user = params.get('format_sort', [])
5257 self._sort_extractor = sort_extractor
5258
5259 def add_item(field, reverse, closest, limit_text):
5260 field = field.lower()
5261 if field in self._order:
5262 return
5263 self._order.append(field)
5264 limit = self._resolve_field_value(field, limit_text)
5265 data = {
5266 'reverse': reverse,
5267 'closest': False if limit is None else closest,
5268 'limit_text': limit_text,
5269 'limit': limit}
5270 if field in self.settings:
5271 self.settings[field].update(data)
5272 else:
5273 self.settings[field] = data
5274
5275 sort_list = (
5276 tuple(field for field in self.default if self._get_field_setting(field, 'forced'))
5277 + (tuple() if params.get('format_sort_force', False)
5278 else tuple(field for field in self.default if self._get_field_setting(field, 'priority')))
5279 + tuple(self._sort_user) + tuple(sort_extractor) + self.default)
5280
5281 for item in sort_list:
5282 match = re.match(self.regex, item)
5283 if match is None:
5284 raise ExtractorError('Invalid format sort string "%s" given by extractor' % item)
5285 field = match.group('field')
5286 if field is None:
5287 continue
5288 if self._get_field_setting(field, 'type') == 'alias':
5289 alias, field = field, self._get_field_setting(field, 'field')
5290 if self._get_field_setting(alias, 'deprecated'):
5291 self.ydl.deprecated_feature(f'Format sorting alias {alias} is deprecated and may '
5292 f'be removed in a future version. Please use {field} instead')
5293 reverse = match.group('reverse') is not None
5294 closest = match.group('separator') == '~'
5295 limit_text = match.group('limit')
5296
5297 has_limit = limit_text is not None
5298 has_multiple_fields = self._get_field_setting(field, 'type') == 'combined'
5299 has_multiple_limits = has_limit and has_multiple_fields and not self._get_field_setting(field, 'same_limit')
5300
5301 fields = self._get_field_setting(field, 'field') if has_multiple_fields else (field,)
5302 limits = limit_text.split(':') if has_multiple_limits else (limit_text,) if has_limit else tuple()
5303 limit_count = len(limits)
5304 for (i, f) in enumerate(fields):
5305 add_item(f, reverse, closest,
5306 limits[i] if i < limit_count
5307 else limits[0] if has_limit and not has_multiple_limits
5308 else None)
5309
5310 def print_verbose_info(self, write_debug):
5311 if self._sort_user:
5312 write_debug('Sort order given by user: %s' % ', '.join(self._sort_user))
5313 if self._sort_extractor:
5314 write_debug('Sort order given by extractor: %s' % ', '.join(self._sort_extractor))
5315 write_debug('Formats sorted by: %s' % ', '.join(['%s%s%s' % (
5316 '+' if self._get_field_setting(field, 'reverse') else '', field,
5317 '%s%s(%s)' % ('~' if self._get_field_setting(field, 'closest') else ':',
5318 self._get_field_setting(field, 'limit_text'),
5319 self._get_field_setting(field, 'limit'))
5320 if self._get_field_setting(field, 'limit_text') is not None else '')
5321 for field in self._order if self._get_field_setting(field, 'visible')]))
5322
5323 def _calculate_field_preference_from_value(self, format, field, type, value):
5324 reverse = self._get_field_setting(field, 'reverse')
5325 closest = self._get_field_setting(field, 'closest')
5326 limit = self._get_field_setting(field, 'limit')
5327
5328 if type == 'extractor':
5329 maximum = self._get_field_setting(field, 'max')
5330 if value is None or (maximum is not None and value >= maximum):
5331 value = -1
5332 elif type == 'boolean':
5333 in_list = self._get_field_setting(field, 'in_list')
5334 not_in_list = self._get_field_setting(field, 'not_in_list')
5335 value = 0 if ((in_list is None or value in in_list) and (not_in_list is None or value not in not_in_list)) else -1
5336 elif type == 'ordered':
5337 value = self._resolve_field_value(field, value, True)
5338
5339 # try to convert to number
5340 val_num = float_or_none(value, default=self._get_field_setting(field, 'default'))
5341 is_num = self._get_field_setting(field, 'convert') != 'string' and val_num is not None
5342 if is_num:
5343 value = val_num
5344
5345 return ((-10, 0) if value is None
5346 else (1, value, 0) if not is_num # if a field has mixed strings and numbers, strings are sorted higher
5347 else (0, -abs(value - limit), value - limit if reverse else limit - value) if closest
5348 else (0, value, 0) if not reverse and (limit is None or value <= limit)
5349 else (0, -value, 0) if limit is None or (reverse and value == limit) or value > limit
5350 else (-1, value, 0))
5351
5352 def _calculate_field_preference(self, format, field):
5353 type = self._get_field_setting(field, 'type') # extractor, boolean, ordered, field, multiple
5354 get_value = lambda f: format.get(self._get_field_setting(f, 'field'))
5355 if type == 'multiple':
5356 type = 'field' # Only 'field' is allowed in multiple for now
5357 actual_fields = self._get_field_setting(field, 'field')
5358
5359 value = self._get_field_setting(field, 'function')(get_value(f) for f in actual_fields)
5360 else:
5361 value = get_value(field)
5362 return self._calculate_field_preference_from_value(format, field, type, value)
5363
5364 def calculate_preference(self, format):
5365 # Determine missing protocol
5366 if not format.get('protocol'):
5367 format['protocol'] = determine_protocol(format)
5368
5369 # Determine missing ext
5370 if not format.get('ext') and 'url' in format:
5371 format['ext'] = determine_ext(format['url'])
5372 if format.get('vcodec') == 'none':
5373 format['audio_ext'] = format['ext'] if format.get('acodec') != 'none' else 'none'
5374 format['video_ext'] = 'none'
5375 else:
5376 format['video_ext'] = format['ext']
5377 format['audio_ext'] = 'none'
5378 # if format.get('preference') is None and format.get('ext') in ('f4f', 'f4m'): # Not supported?
5379 # format['preference'] = -1000
5380
5381 if format.get('preference') is None and format.get('ext') == 'flv' and re.match('[hx]265|he?vc?', format.get('vcodec') or ''):
5382 # HEVC-over-FLV is out-of-spec by FLV's original spec
5383 # ref. https://trac.ffmpeg.org/ticket/6389
5384 # ref. https://github.com/yt-dlp/yt-dlp/pull/5821
5385 format['preference'] = -100
5386
5387 # Determine missing bitrates
5388 if format.get('vcodec') == 'none':
5389 format['vbr'] = 0
5390 if format.get('acodec') == 'none':
5391 format['abr'] = 0
5392 if not format.get('vbr') and format.get('vcodec') != 'none':
5393 format['vbr'] = try_call(lambda: format['tbr'] - format['abr']) or None
5394 if not format.get('abr') and format.get('acodec') != 'none':
5395 format['abr'] = try_call(lambda: format['tbr'] - format['vbr']) or None
5396 if not format.get('tbr'):
5397 format['tbr'] = try_call(lambda: format['vbr'] + format['abr']) or None
5398
5399 return tuple(self._calculate_field_preference(format, field) for field in self._order)
5400
5401
5402 # XXX: Temporary
5403 class _YDLLogger:
5404 def __init__(self, ydl=None):
5405 self._ydl = ydl
5406
5407 def debug(self, message):
5408 if self._ydl:
5409 self._ydl.write_debug(message)
5410
5411 def info(self, message):
5412 if self._ydl:
5413 self._ydl.to_screen(message)
5414
5415 def warning(self, message, *, once=False):
5416 if self._ydl:
5417 self._ydl.report_warning(message, once)
5418
5419 def error(self, message, *, is_error=True):
5420 if self._ydl:
5421 self._ydl.report_error(message, is_error=is_error)
5422
5423 def stdout(self, message):
5424 if self._ydl:
5425 self._ydl.to_stdout(message)
5426
5427 def stderr(self, message):
5428 if self._ydl:
5429 self._ydl.to_stderr(message)