+import asyncio
import atexit
import base64
import binascii
import xml.etree.ElementTree
import zlib
-from .compat import asyncio, functools # isort: split
+from .compat import functools # isort: split
from .compat import (
compat_etree_fromstring,
compat_expanduser,
'juillet', 'août', 'septembre', 'octobre', 'novembre', 'décembre'],
}
-KNOWN_EXTENSIONS = (
- 'mp4', 'm4a', 'm4p', 'm4b', 'm4r', 'm4v', 'aac',
- 'flv', 'f4v', 'f4a', 'f4b',
- 'webm', 'ogg', 'ogv', 'oga', 'ogx', 'spx', 'opus',
- 'mkv', 'mka', 'mk3d',
- 'avi', 'divx',
- 'mov',
- 'asf', 'wmv', 'wma',
- '3gp', '3g2',
- 'mp3',
- 'flac',
- 'ape',
- 'wav',
- 'f4f', 'f4m', 'm3u8', 'smil')
-
# needed for sanitizing filenames in restricted mode
ACCENT_CHARS = dict(zip('ÂÃÄÀÁÅÆÇÈÉÊËÌÍÎÏÐÑÒÓÔÕÖŐØŒÙÚÛÜŰÝÞßàáâãäåæçèéêëìíîïðñòóôõöőøœùúûüűýþÿ',
itertools.chain('AAAAAA', ['AE'], 'CEEEEIIIIDNOOOOOOO', ['OE'], 'UUUUUY', ['TH', 'ss'],
'%d/%m/%Y',
'%d/%m/%y',
'%d/%m/%Y %H:%M:%S',
+ '%d-%m-%Y %H:%M',
])
DATE_FORMATS_MONTH_FIRST = list(DATE_FORMATS)
])
PACKED_CODES_RE = r"}\('(.+)',(\d+),(\d+),'([^']+)'\.split\('\|'\)"
-JSON_LD_RE = r'(?is)<script[^>]+type=(["\']?)application/ld\+json\1[^>]*>(?P<json_ld>.+?)</script>'
+JSON_LD_RE = r'(?is)<script[^>]+type=(["\']?)application/ld\+json\1[^>]*>\s*(?P<json_ld>{.+?})\s*</script>'
NUMBER_RE = r'\d+(?:\.\d+)?'
if filename == '-':
if sys.platform == 'win32':
import msvcrt
- msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
+ # stdout may be any IO stream. Eg, when using contextlib.redirect_stdout
+ with contextlib.suppress(io.UnsupportedOperation):
+ msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
return (sys.stdout.buffer if hasattr(sys.stdout, 'buffer') else sys.stdout, filename)
for attempt in range(2):
s = re.sub(r'[0-9]+(?::[0-9]+)+', lambda m: m.group(0).replace(':', '_'), s) # Handle timestamps
result = ''.join(map(replace_insane, s))
if is_id is NO_DEFAULT:
- result = re.sub('(\0.)(?:(?=\\1)..)+', r'\1', result) # Remove repeated substitute chars
- STRIP_RE = '(?:\0.|[ _-])*'
+ result = re.sub(r'(\0.)(?:(?=\1)..)+', r'\1', result) # Remove repeated substitute chars
+ STRIP_RE = r'(?:\0.|[ _-])*'
result = re.sub(f'^\0.{STRIP_RE}|{STRIP_RE}\0.$', '', result) # Remove substitute chars from start/end
result = result.replace('\0', '') or '_'
return os.path.join(*sanitized_path)
-def sanitize_url(url):
+def sanitize_url(url, *, scheme='http'):
# Prepend protocol-less URLs with `http:` scheme in order to mitigate
# the number of unwanted failures due to missing protocol
if url is None:
return
elif url.startswith('//'):
- return 'http:%s' % url
+ return f'{scheme}:{url}'
# Fix some common typos seen so far
COMMON_TYPOS = (
# https://github.com/ytdl-org/youtube-dl/issues/15649
if opts_check_certificate:
if has_certifi and 'no-certifi' not in params.get('compat_opts', []):
context.load_verify_locations(cafile=certifi.where())
- try:
- context.load_default_certs()
- # Work around the issue in load_default_certs when there are bad certificates. See:
- # https://github.com/yt-dlp/yt-dlp/issues/1060,
- # https://bugs.python.org/issue35665, https://bugs.python.org/issue45312
- except ssl.SSLError:
- # enum_certificates is not present in mingw python. See https://github.com/yt-dlp/yt-dlp/issues/1151
- if sys.platform == 'win32' and hasattr(ssl, 'enum_certificates'):
- for storename in ('CA', 'ROOT'):
- _ssl_load_windows_store_certs(context, storename)
- context.set_default_verify_paths()
+ else:
+ try:
+ context.load_default_certs()
+ # Work around the issue in load_default_certs when there are bad certificates. See:
+ # https://github.com/yt-dlp/yt-dlp/issues/1060,
+ # https://bugs.python.org/issue35665, https://bugs.python.org/issue45312
+ except ssl.SSLError:
+ # enum_certificates is not present in mingw python. See https://github.com/yt-dlp/yt-dlp/issues/1151
+ if sys.platform == 'win32' and hasattr(ssl, 'enum_certificates'):
+ for storename in ('CA', 'ROOT'):
+ _ssl_load_windows_store_certs(context, storename)
+ context.set_default_verify_paths()
client_certfile = params.get('client_certificate')
if client_certfile:
self.countries = countries
+class UserNotLive(ExtractorError):
+ """Error when a channel/user is not live"""
+
+ def __init__(self, msg=None, **kwargs):
+ kwargs['expected'] = True
+ super().__init__(msg or 'The channel is not currently live', **kwargs)
+
+
class DownloadError(YoutubeDLError):
"""Download Error exception.
def __str__(self):
return f'{self.start.isoformat()} - {self.end.isoformat()}'
+ def __eq__(self, other):
+ return (isinstance(other, DateRange)
+ and self.start == other.start and self.end == other.end)
+
def platform_name():
""" Returns the platform name as a str """
def get_domain(url):
- domain = re.match(r'(?:https?:\/\/)?(?:www\.)?(?P<domain>[^\n\/]+\.[^\n\/]+)(?:\/(.*))?', url)
- return domain.group('domain') if domain else None
+ """
+ This implementation is inconsistent, but is kept for compatibility.
+ Use this only for "webpage_url_domain"
+ """
+ return remove_start(urllib.parse.urlparse(url).netloc, 'www.') or None
def url_basename(url):
@staticmethod
def _reverse_index(x):
- return None if x is None else -(x + 1)
+ return None if x is None else ~x
def __getitem__(self, idx):
if isinstance(idx, slice):
str.strip, codecs_str.strip().strip(',').split(','))))
vcodec, acodec, scodec, hdr = None, None, None, None
for full_codec in split_codecs:
- parts = full_codec.split('.')
- codec = parts[0].replace('0', '')
- if codec in ('avc1', 'avc2', 'avc3', 'avc4', 'vp9', 'vp8', 'hev1', 'hev2',
- 'h263', 'h264', 'mp4v', 'hvc1', 'av1', 'theora', 'dvh1', 'dvhe'):
- if not vcodec:
- vcodec = '.'.join(parts[:4]) if codec in ('vp9', 'av1', 'hvc1') else full_codec
- if codec in ('dvh1', 'dvhe'):
- hdr = 'DV'
- elif codec == 'av1' and len(parts) > 3 and parts[3] == '10':
- hdr = 'HDR10'
- elif full_codec.replace('0', '').startswith('vp9.2'):
- hdr = 'HDR10'
- elif codec in ('flac', 'mp4a', 'opus', 'vorbis', 'mp3', 'aac', 'ac-3', 'ec-3', 'eac3', 'dtsc', 'dtse', 'dtsh', 'dtsl'):
- if not acodec:
- acodec = full_codec
- elif codec in ('stpp', 'wvtt',):
- if not scodec:
- scodec = full_codec
+ parts = re.sub(r'0+(?=\d)', '', full_codec).split('.')
+ if parts[0] in ('avc1', 'avc2', 'avc3', 'avc4', 'vp9', 'vp8', 'hev1', 'hev2',
+ 'h263', 'h264', 'mp4v', 'hvc1', 'av1', 'theora', 'dvh1', 'dvhe'):
+ if vcodec:
+ continue
+ vcodec = full_codec
+ if parts[0] in ('dvh1', 'dvhe'):
+ hdr = 'DV'
+ elif parts[0] == 'av1' and traverse_obj(parts, 3) == '10':
+ hdr = 'HDR10'
+ elif parts[:2] == ['vp9', '2']:
+ hdr = 'HDR10'
+ elif parts[0] in ('flac', 'mp4a', 'opus', 'vorbis', 'mp3', 'aac',
+ 'ac-3', 'ec-3', 'eac3', 'dtsc', 'dtse', 'dtsh', 'dtsl'):
+ acodec = acodec or full_codec
+ elif parts[0] in ('stpp', 'wvtt'):
+ scodec = scodec or full_codec
else:
write_string(f'WARNING: Unknown codec {full_codec}\n')
if vcodec or acodec or scodec:
return age_limit < content_limit
+# List of known byte-order-marks (BOM)
+BOMS = [
+ (b'\xef\xbb\xbf', 'utf-8'),
+ (b'\x00\x00\xfe\xff', 'utf-32-be'),
+ (b'\xff\xfe\x00\x00', 'utf-32-le'),
+ (b'\xff\xfe', 'utf-16-le'),
+ (b'\xfe\xff', 'utf-16-be'),
+]
+
+
def is_html(first_bytes):
""" Detect whether a file contains HTML by examining its first bytes. """
- BOMS = [
- (b'\xef\xbb\xbf', 'utf-8'),
- (b'\x00\x00\xfe\xff', 'utf-32-be'),
- (b'\xff\xfe\x00\x00', 'utf-32-le'),
- (b'\xff\xfe', 'utf-16-le'),
- (b'\xfe\xff', 'utf-16-be'),
- ]
-
encoding = 'utf-8'
for bom, enc in BOMS:
while first_bytes.startswith(bom):
if not filters or any(match_str(f, info_dict, incomplete) for f in filters):
return NO_DEFAULT if interactive and not incomplete else None
else:
- video_title = info_dict.get('title') or info_dict.get('id') or 'video'
+ video_title = info_dict.get('title') or info_dict.get('id') or 'entry'
filter_str = ') | ('.join(map(str.strip, filters))
return f'{video_title} does not pass filter ({filter_str}), skipping ..'
return _match_func
-def download_range_func(chapters, ranges):
- def inner(info_dict, ydl):
+class download_range_func:
+ def __init__(self, chapters, ranges):
+ self.chapters, self.ranges = chapters, ranges
+
+ def __call__(self, info_dict, ydl):
warning = ('There are no chapters matching the regex' if info_dict.get('chapters')
else 'Cannot match chapters since chapter information is unavailable')
- for regex in chapters or []:
+ for regex in self.chapters or []:
for i, chapter in enumerate(info_dict.get('chapters') or []):
if re.search(regex, chapter['title']):
warning = None
yield {**chapter, 'index': i}
- if chapters and warning:
+ if self.chapters and warning:
ydl.to_screen(f'[info] {info_dict["id"]}: {warning}')
- yield from ({'start_time': start, 'end_time': end} for start, end in ranges or [])
+ yield from ({'start_time': start, 'end_time': end} for start, end in self.ranges or [])
- return inner
+ def __eq__(self, other):
+ return (isinstance(other, download_range_func)
+ and self.chapters == other.chapters and self.ranges == other.ranges)
def parse_dfxp_time_expr(time_expr):
raise ValueError('Either table or n must be specified')
table = (table or '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ')[:n]
- if n != len(table):
+ if n and n != len(table):
raise ValueError(f'base {n} exceeds table length {len(table)}')
return table
return sys.stdin
+def determine_file_encoding(data):
+ """
+ Detect the text encoding used
+ @returns (encoding, bytes to skip)
+ """
+
+ # BOM marks are given priority over declarations
+ for bom, enc in BOMS:
+ if data.startswith(bom):
+ return enc, len(bom)
+
+ # Strip off all null bytes to match even when UTF-16 or UTF-32 is used.
+ # We ignore the endianness to get a good enough match
+ data = data.replace(b'\0', b'')
+ mobj = re.match(rb'(?m)^#\s*coding\s*:\s*(\S+)\s*$', data)
+ return mobj.group(1).decode() if mobj else None, 0
+
+
class Config:
own_args = None
parsed_args = None
def init(self, args=None, filename=None):
assert not self.__initialized
+ self.own_args, self.filename = args, filename
+ return self.load_configs()
+
+ def load_configs(self):
directory = ''
- if filename:
- location = os.path.realpath(filename)
+ if self.filename:
+ location = os.path.realpath(self.filename)
directory = os.path.dirname(location)
if location in self._loaded_paths:
return False
self._loaded_paths.add(location)
- self.own_args, self.__initialized = args, True
- opts, _ = self.parser.parse_known_args(args)
- self.parsed_args, self.filename = args, filename
-
+ self.__initialized = True
+ opts, _ = self.parser.parse_known_args(self.own_args)
+ self.parsed_args = self.own_args
for location in opts.config_locations or []:
if location == '-':
self.append_config(shlex.split(read_stdin('options'), comments=True), label='stdin')
@staticmethod
def read_file(filename, default=[]):
try:
- optionf = open(filename)
+ optionf = open(filename, 'rb')
except OSError:
return default # silently skip if file is not present
+ try:
+ enc, skip = determine_file_encoding(optionf.read(512))
+ optionf.seek(skip, io.SEEK_SET)
+ except OSError:
+ enc = None # silently skip read errors
try:
# FIXME: https://github.com/ytdl-org/youtube-dl/commit/dfe5fa49aed02cf36ba9f743b11b0903554b5e56
- contents = optionf.read()
+ contents = optionf.read().decode(enc or preferredencoding())
res = shlex.split(contents, comments=True)
except Exception as err:
raise ValueError(f'Unable to parse "{filename}": {err}')
return self.__dict__.items()
+MEDIA_EXTENSIONS = Namespace(
+ common_video=('avi', 'flv', 'mkv', 'mov', 'mp4', 'webm'),
+ video=('3g2', '3gp', 'f4v', 'mk3d', 'divx', 'mpg', 'ogv', 'm4v', 'wmv'),
+ common_audio=('aiff', 'alac', 'flac', 'm4a', 'mka', 'mp3', 'ogg', 'opus', 'wav'),
+ audio=('aac', 'ape', 'asf', 'f4a', 'f4b', 'm4b', 'm4p', 'm4r', 'oga', 'ogx', 'spx', 'vorbis', 'wma'),
+ thumbnails=('jpg', 'png', 'webp'),
+ storyboards=('mhtml', ),
+ subtitles=('srt', 'vtt', 'ass', 'lrc'),
+ manifests=('f4f', 'f4m', 'm3u8', 'smil', 'mpd'),
+)
+MEDIA_EXTENSIONS.video += MEDIA_EXTENSIONS.common_video
+MEDIA_EXTENSIONS.audio += MEDIA_EXTENSIONS.common_audio
+
+KNOWN_EXTENSIONS = (*MEDIA_EXTENSIONS.video, *MEDIA_EXTENSIONS.audio, *MEDIA_EXTENSIONS.manifests)
+
+
# Deprecated
has_certifi = bool(certifi)
has_websockets = bool(websockets)