+import asyncio
import atexit
import base64
import binascii
import codecs
import collections
import contextlib
-import ctypes
import datetime
import email.header
import email.utils
import time
import traceback
import types
+import unicodedata
import urllib.error
import urllib.parse
import urllib.request
import xml.etree.ElementTree
import zlib
-from .compat import asyncio, functools # isort: split
+from .compat import functools # isort: split
from .compat import (
compat_etree_fromstring,
compat_expanduser,
'juillet', 'août', 'septembre', 'octobre', 'novembre', 'décembre'],
}
-KNOWN_EXTENSIONS = (
- 'mp4', 'm4a', 'm4p', 'm4b', 'm4r', 'm4v', 'aac',
- 'flv', 'f4v', 'f4a', 'f4b',
- 'webm', 'ogg', 'ogv', 'oga', 'ogx', 'spx', 'opus',
- 'mkv', 'mka', 'mk3d',
- 'avi', 'divx',
- 'mov',
- 'asf', 'wmv', 'wma',
- '3gp', '3g2',
- 'mp3',
- 'flac',
- 'ape',
- 'wav',
- 'f4f', 'f4m', 'm3u8', 'smil')
+# From https://github.com/python/cpython/blob/3.11/Lib/email/_parseaddr.py#L36-L42
+TIMEZONE_NAMES = {
+ 'UT': 0, 'UTC': 0, 'GMT': 0, 'Z': 0,
+ 'AST': -4, 'ADT': -3, # Atlantic (used in Canada)
+ 'EST': -5, 'EDT': -4, # Eastern
+ 'CST': -6, 'CDT': -5, # Central
+ 'MST': -7, 'MDT': -6, # Mountain
+ 'PST': -8, 'PDT': -7 # Pacific
+}
# needed for sanitizing filenames in restricted mode
ACCENT_CHARS = dict(zip('ÂÃÄÀÁÅÆÇÈÉÊËÌÍÎÏÐÑÒÓÔÕÖŐØŒÙÚÛÜŰÝÞßàáâãäåæçèéêëìíîïðñòóôõöőøœùúûüűýþÿ',
'%d/%m/%Y',
'%d/%m/%y',
'%d/%m/%Y %H:%M:%S',
+ '%d-%m-%Y %H:%M',
])
DATE_FORMATS_MONTH_FIRST = list(DATE_FORMATS)
if filename == '-':
if sys.platform == 'win32':
import msvcrt
- msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
+
+ # stdout may be any IO stream, e.g. when using contextlib.redirect_stdout
+ with contextlib.suppress(io.UnsupportedOperation):
+ msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
return (sys.stdout.buffer if hasattr(sys.stdout, 'buffer') else sys.stdout, filename)
for attempt in range(2):
return ACCENT_CHARS[char]
elif not restricted and char == '\n':
return '\0 '
+ elif is_id is NO_DEFAULT and not restricted and char in '"*:<>?|/\\':
+ # Replace with their full-width unicode counterparts
+ return {'/': '\u29F8', '\\': '\u29f9'}.get(char, chr(ord(char) + 0xfee0))
elif char == '?' or ord(char) < 32 or ord(char) == 127:
return ''
elif char == '"':
return '\0_'
return char
+ if restricted and is_id is NO_DEFAULT:
+ s = unicodedata.normalize('NFKC', s)
s = re.sub(r'[0-9]+(?::[0-9]+)+', lambda m: m.group(0).replace(':', '_'), s) # Handle timestamps
result = ''.join(map(replace_insane, s))
if is_id is NO_DEFAULT:
return os.path.join(*sanitized_path)
-def sanitize_url(url):
+def sanitize_url(url, *, scheme='http'):
# Prepend protocol-less URLs with `http:` scheme in order to mitigate
# the number of unwanted failures due to missing protocol
if url is None:
return
elif url.startswith('//'):
- return 'http:%s' % url
+ return f'{scheme}:{url}'
# Fix some common typos seen so far
COMMON_TYPOS = (
# https://github.com/ytdl-org/youtube-dl/issues/15649
if entity in html.entities.name2codepoint:
return chr(html.entities.name2codepoint[entity])
- # TODO: HTML5 allows entities without a semicolon. For example,
- # 'Éric' should be decoded as 'Éric'.
+ # TODO: HTML5 allows entities without a semicolon.
+ # E.g. 'Éric' should be decoded as 'Éric'.
if entity_with_semicolon in html.entities.html5:
return html.entities.html5[entity_with_semicolon]
if opts_check_certificate:
if has_certifi and 'no-certifi' not in params.get('compat_opts', []):
context.load_verify_locations(cafile=certifi.where())
- try:
- context.load_default_certs()
- # Work around the issue in load_default_certs when there are bad certificates. See:
- # https://github.com/yt-dlp/yt-dlp/issues/1060,
- # https://bugs.python.org/issue35665, https://bugs.python.org/issue45312
- except ssl.SSLError:
- # enum_certificates is not present in mingw python. See https://github.com/yt-dlp/yt-dlp/issues/1151
- if sys.platform == 'win32' and hasattr(ssl, 'enum_certificates'):
- for storename in ('CA', 'ROOT'):
- _ssl_load_windows_store_certs(context, storename)
- context.set_default_verify_paths()
+ else:
+ try:
+ context.load_default_certs()
+ # Work around the issue in load_default_certs when there are bad certificates. See:
+ # https://github.com/yt-dlp/yt-dlp/issues/1060,
+ # https://bugs.python.org/issue35665, https://bugs.python.org/issue45312
+ except ssl.SSLError:
+ # enum_certificates is not present in mingw python. See https://github.com/yt-dlp/yt-dlp/issues/1151
+ if sys.platform == 'win32' and hasattr(ssl, 'enum_certificates'):
+ for storename in ('CA', 'ROOT'):
+ _ssl_load_windows_store_certs(context, storename)
+ context.set_default_verify_paths()
client_certfile = params.get('client_certificate')
if client_certfile:
self.countries = countries
+class UserNotLive(ExtractorError):
+ """Error when a channel/user is not live"""
+
+ def __init__(self, msg=None, **kwargs):
+ kwargs['expected'] = True
+ super().__init__(msg or 'The channel is not currently live', **kwargs)
+
+
class DownloadError(YoutubeDLError):
"""Download Error exception.
$)
''', date_str)
if not m:
- timezone = datetime.timedelta()
+ m = re.search(r'\d{1,2}:\d{1,2}(?:\.\d+)?(?P<tz>\s*[A-Z]+)$', date_str)
+ timezone = TIMEZONE_NAMES.get(m and m.group('tz').strip())
+ if timezone is not None:
+ date_str = date_str[:-len(m.group('tz'))]
+ timezone = datetime.timedelta(hours=timezone or 0)
else:
date_str = date_str[:-len(m.group('tz'))]
if not m.group('sign'):
if date_str is None:
return None
- date_str = re.sub(r'[,|]', '', date_str)
+ date_str = re.sub(r'\s+', ' ', re.sub(
+ r'(?i)[,|]|(mon|tues?|wed(nes)?|thu(rs)?|fri|sat(ur)?)(day)?', '', date_str))
pm_delta = 12 if re.search(r'(?i)PM', date_str) else 0
timezone, date_str = extract_timezone(date_str)
with contextlib.suppress(ValueError):
dt = datetime.datetime.strptime(date_str, expression) - timezone + datetime.timedelta(hours=pm_delta)
return calendar.timegm(dt.timetuple())
+
timetuple = email.utils.parsedate_tz(date_str)
if timetuple:
- return calendar.timegm(timetuple) + pm_delta * 3600
+ return calendar.timegm(timetuple) + pm_delta * 3600 - timezone.total_seconds()
def determine_ext(url, default_ext='unknown_video'):
def __str__(self):
return f'{self.start.isoformat()} - {self.end.isoformat()}'
+ def __eq__(self, other):
+ return (isinstance(other, DateRange)
+ and self.start == other.start and self.end == other.end)
+
def platform_name():
""" Returns the platform name as a str """
# Cross-platform file locking
if sys.platform == 'win32':
+ import ctypes
import ctypes.wintypes
import msvcrt
def setproctitle(title):
assert isinstance(title, str)
- # ctypes in Jython is not complete
- # http://bugs.jython.org/issue2148
- if sys.platform.startswith('java'):
+ # Workaround for https://github.com/yt-dlp/yt-dlp/issues/4541
+ try:
+ import ctypes
+ except ImportError:
return
try:
def get_domain(url):
- return '.'.join(urllib.parse.urlparse(url).netloc.rsplit('.', 2)[-2:])
+ """
+ This implementation is inconsistent, but is kept for compatibility.
+ Use this only for "webpage_url_domain"
+ """
+ return remove_start(urllib.parse.urlparse(url).netloc, 'www.') or None
def url_basename(url):
@staticmethod
def _reverse_index(x):
- return None if x is None else -(x + 1)
+ return None if x is None else ~x
def __getitem__(self, idx):
if isinstance(idx, slice):
r'\g<callback_data>', code)
-def js_to_json(code, vars={}):
+def js_to_json(code, vars={}, *, strict=False):
# vars is a dict of var, val pairs to substitute
COMMENT_RE = r'/\*(?:(?!\*/).)*?\*/|//[^\n]*\n'
SKIP_RE = fr'\s*(?:{COMMENT_RE})?\s*'
if v in vars:
return vars[v]
+ if strict:
+ raise ValueError(f'Unknown value: {v}')
return '"%s"' % v
def create_map(mobj):
return json.dumps(dict(json.loads(js_to_json(mobj.group(1) or '[]', vars=vars))))
- code = re.sub(r'new Date\((".+")\)', r'\g<1>', code)
code = re.sub(r'new Map\((\[.*?\])?\)', create_map, code)
+ if not strict:
+ code = re.sub(r'new Date\((".+")\)', r'\g<1>', code)
return re.sub(r'''(?sx)
"(?:[^"\\]*(?:\\\\|\\['"nurtbfx/\n]))*[^"\\]*"|
str.strip, codecs_str.strip().strip(',').split(','))))
vcodec, acodec, scodec, hdr = None, None, None, None
for full_codec in split_codecs:
- parts = full_codec.split('.')
- codec = parts[0].replace('0', '')
- if codec in ('avc1', 'avc2', 'avc3', 'avc4', 'vp9', 'vp8', 'hev1', 'hev2',
- 'h263', 'h264', 'mp4v', 'hvc1', 'av1', 'theora', 'dvh1', 'dvhe'):
- if not vcodec:
- vcodec = '.'.join(parts[:4]) if codec in ('vp9', 'av1', 'hvc1') else full_codec
- if codec in ('dvh1', 'dvhe'):
- hdr = 'DV'
- elif codec == 'av1' and len(parts) > 3 and parts[3] == '10':
- hdr = 'HDR10'
- elif full_codec.replace('0', '').startswith('vp9.2'):
- hdr = 'HDR10'
- elif codec in ('flac', 'mp4a', 'opus', 'vorbis', 'mp3', 'aac', 'ac-3', 'ec-3', 'eac3', 'dtsc', 'dtse', 'dtsh', 'dtsl'):
- if not acodec:
- acodec = full_codec
- elif codec in ('stpp', 'wvtt',):
- if not scodec:
- scodec = full_codec
+ parts = re.sub(r'0+(?=\d)', '', full_codec).split('.')
+ if parts[0] in ('avc1', 'avc2', 'avc3', 'avc4', 'vp9', 'vp8', 'hev1', 'hev2',
+ 'h263', 'h264', 'mp4v', 'hvc1', 'av1', 'theora', 'dvh1', 'dvhe'):
+ if vcodec:
+ continue
+ vcodec = full_codec
+ if parts[0] in ('dvh1', 'dvhe'):
+ hdr = 'DV'
+ elif parts[0] == 'av1' and traverse_obj(parts, 3) == '10':
+ hdr = 'HDR10'
+ elif parts[:2] == ['vp9', '2']:
+ hdr = 'HDR10'
+ elif parts[0] in ('flac', 'mp4a', 'opus', 'vorbis', 'mp3', 'aac',
+ 'ac-3', 'ec-3', 'eac3', 'dtsc', 'dtse', 'dtsh', 'dtsl'):
+ acodec = acodec or full_codec
+ elif parts[0] in ('stpp', 'wvtt'):
+ scodec = scodec or full_codec
else:
write_string(f'WARNING: Unknown codec {full_codec}\n')
if vcodec or acodec or scodec:
return {}
+def get_compatible_ext(*, vcodecs, acodecs, vexts, aexts, preferences=None):
+ assert len(vcodecs) == len(vexts) and len(acodecs) == len(aexts)
+
+ allow_mkv = not preferences or 'mkv' in preferences
+
+ if allow_mkv and max(len(acodecs), len(vcodecs)) > 1:
+ return 'mkv' # TODO: any other format allows this?
+
+ # TODO: All codecs supported by parse_codecs isn't handled here
+ COMPATIBLE_CODECS = {
+ 'mp4': {
+ 'av1', 'hevc', 'avc1', 'mp4a', # fourcc (m3u8, mpd)
+ 'h264', 'aacl', # Set in ISM
+ },
+ 'webm': {
+ 'av1', 'vp9', 'vp8', 'opus', 'vrbs',
+ 'vp9x', 'vp8x', # in the webm spec
+ },
+ }
+
+ sanitize_codec = functools.partial(try_get, getter=lambda x: x[0].split('.')[0].replace('0', ''))
+ vcodec, acodec = sanitize_codec(vcodecs), sanitize_codec(acodecs)
+
+ for ext in preferences or COMPATIBLE_CODECS.keys():
+ codec_set = COMPATIBLE_CODECS.get(ext, set())
+ if ext == 'mkv' or codec_set.issuperset((vcodec, acodec)):
+ return ext
+
+ COMPATIBLE_EXTS = (
+ {'mp3', 'mp4', 'm4a', 'm4p', 'm4b', 'm4r', 'm4v', 'ismv', 'isma', 'mov'},
+ {'webm'},
+ )
+ for ext in preferences or vexts:
+ current_exts = {ext, *vexts, *aexts}
+ if ext == 'mkv' or current_exts == {ext} or any(
+ ext_sets.issuperset(current_exts) for ext_sets in COMPATIBLE_EXTS):
+ return ext
+ return 'mkv' if allow_mkv else preferences[-1]
+
+
def urlhandle_detect_ext(url_handle):
getheader = url_handle.headers.get
return age_limit < content_limit
+# List of known byte-order-marks (BOM)
+BOMS = [
+ (b'\xef\xbb\xbf', 'utf-8'),
+ (b'\x00\x00\xfe\xff', 'utf-32-be'),
+ (b'\xff\xfe\x00\x00', 'utf-32-le'),
+ (b'\xff\xfe', 'utf-16-le'),
+ (b'\xfe\xff', 'utf-16-be'),
+]
+
+
def is_html(first_bytes):
""" Detect whether a file contains HTML by examining its first bytes. """
- BOMS = [
- (b'\xef\xbb\xbf', 'utf-8'),
- (b'\x00\x00\xfe\xff', 'utf-32-be'),
- (b'\xff\xfe\x00\x00', 'utf-32-le'),
- (b'\xff\xfe', 'utf-16-le'),
- (b'\xfe\xff', 'utf-16-be'),
- ]
-
encoding = 'utf-8'
for bom, enc in BOMS:
while first_bytes.startswith(bom):
if not filters or any(match_str(f, info_dict, incomplete) for f in filters):
return NO_DEFAULT if interactive and not incomplete else None
else:
- video_title = info_dict.get('title') or info_dict.get('id') or 'video'
+ video_title = info_dict.get('title') or info_dict.get('id') or 'entry'
filter_str = ') | ('.join(map(str.strip, filters))
return f'{video_title} does not pass filter ({filter_str}), skipping ..'
return _match_func
-def download_range_func(chapters, ranges):
- def inner(info_dict, ydl):
+class download_range_func:
+ def __init__(self, chapters, ranges):
+ self.chapters, self.ranges = chapters, ranges
+
+ def __call__(self, info_dict, ydl):
warning = ('There are no chapters matching the regex' if info_dict.get('chapters')
else 'Cannot match chapters since chapter information is unavailable')
- for regex in chapters or []:
+ for regex in self.chapters or []:
for i, chapter in enumerate(info_dict.get('chapters') or []):
if re.search(regex, chapter['title']):
warning = None
yield {**chapter, 'index': i}
- if chapters and warning:
+ if self.chapters and warning:
ydl.to_screen(f'[info] {info_dict["id"]}: {warning}')
- yield from ({'start_time': start, 'end_time': end} for start, end in ranges or [])
+ yield from ({'start_time': start, 'end_time': end} for start, end in self.ranges or [])
- return inner
+ def __eq__(self, other):
+ return (isinstance(other, download_range_func)
+ and self.chapters == other.chapters and self.ranges == other.ranges)
def parse_dfxp_time_expr(time_expr):
raise ValueError('Either table or n must be specified')
table = (table or '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ')[:n]
- if n != len(table):
+ if n and n != len(table):
raise ValueError(f'base {n} exceeds table length {len(table)}')
return table
return sys.stdin
+def determine_file_encoding(data):
+ """
+ Detect the text encoding used
+ @returns (encoding, bytes to skip)
+ """
+
+ # BOM marks are given priority over declarations
+ for bom, enc in BOMS:
+ if data.startswith(bom):
+ return enc, len(bom)
+
+ # Strip off all null bytes to match even when UTF-16 or UTF-32 is used.
+ # We ignore the endianness to get a good enough match
+ data = data.replace(b'\0', b'')
+ mobj = re.match(rb'(?m)^#\s*coding\s*:\s*(\S+)\s*$', data)
+ return mobj.group(1).decode() if mobj else None, 0
+
+
class Config:
own_args = None
parsed_args = None
def init(self, args=None, filename=None):
assert not self.__initialized
+ self.own_args, self.filename = args, filename
+ return self.load_configs()
+
+ def load_configs(self):
directory = ''
- if filename:
- location = os.path.realpath(filename)
+ if self.filename:
+ location = os.path.realpath(self.filename)
directory = os.path.dirname(location)
if location in self._loaded_paths:
return False
self._loaded_paths.add(location)
- self.own_args, self.__initialized = args, True
- opts, _ = self.parser.parse_known_args(args)
- self.parsed_args, self.filename = args, filename
-
+ self.__initialized = True
+ opts, _ = self.parser.parse_known_args(self.own_args)
+ self.parsed_args = self.own_args
for location in opts.config_locations or []:
if location == '-':
self.append_config(shlex.split(read_stdin('options'), comments=True), label='stdin')
@staticmethod
def read_file(filename, default=[]):
try:
- optionf = open(filename)
+ optionf = open(filename, 'rb')
except OSError:
return default # silently skip if file is not present
+ try:
+ enc, skip = determine_file_encoding(optionf.read(512))
+ optionf.seek(skip, io.SEEK_SET)
+ except OSError:
+ enc = None # silently skip read errors
try:
# FIXME: https://github.com/ytdl-org/youtube-dl/commit/dfe5fa49aed02cf36ba9f743b11b0903554b5e56
- contents = optionf.read()
+ contents = optionf.read().decode(enc or preferredencoding())
res = shlex.split(contents, comments=True)
except Exception as err:
raise ValueError(f'Unable to parse "{filename}": {err}')
return self.__dict__.items()
+MEDIA_EXTENSIONS = Namespace(
+ common_video=('avi', 'flv', 'mkv', 'mov', 'mp4', 'webm'),
+ video=('3g2', '3gp', 'f4v', 'mk3d', 'divx', 'mpg', 'ogv', 'm4v', 'wmv'),
+ common_audio=('aiff', 'alac', 'flac', 'm4a', 'mka', 'mp3', 'ogg', 'opus', 'wav'),
+ audio=('aac', 'ape', 'asf', 'f4a', 'f4b', 'm4b', 'm4p', 'm4r', 'oga', 'ogx', 'spx', 'vorbis', 'wma'),
+ thumbnails=('jpg', 'png', 'webp'),
+ storyboards=('mhtml', ),
+ subtitles=('srt', 'vtt', 'ass', 'lrc'),
+ manifests=('f4f', 'f4m', 'm3u8', 'smil', 'mpd'),
+)
+MEDIA_EXTENSIONS.video += MEDIA_EXTENSIONS.common_video
+MEDIA_EXTENSIONS.audio += MEDIA_EXTENSIONS.common_audio
+
+KNOWN_EXTENSIONS = (*MEDIA_EXTENSIONS.video, *MEDIA_EXTENSIONS.audio, *MEDIA_EXTENSIONS.manifests)
+
+
+class RetryManager:
+ """Usage:
+ for retry in RetryManager(...):
+ try:
+ ...
+ except SomeException as err:
+ retry.error = err
+ continue
+ """
+ attempt, _error = 0, None
+
+ def __init__(self, _retries, _error_callback, **kwargs):
+ self.retries = _retries or 0
+ self.error_callback = functools.partial(_error_callback, **kwargs)
+
+ def _should_retry(self):
+ return self._error is not NO_DEFAULT and self.attempt <= self.retries
+
+ @property
+ def error(self):
+ if self._error is NO_DEFAULT:
+ return None
+ return self._error
+
+ @error.setter
+ def error(self, value):
+ self._error = value
+
+ def __iter__(self):
+ while self._should_retry():
+ self.error = NO_DEFAULT
+ self.attempt += 1
+ yield self
+ if self.error:
+ self.error_callback(self.error, self.attempt, self.retries)
+
+ @staticmethod
+ def report_retry(e, count, retries, *, sleep_func, info, warn, error=None, suffix=None):
+ """Utility function for reporting retries"""
+ if count > retries:
+ if error:
+ return error(f'{e}. Giving up after {count - 1} retries') if count > 1 else error(str(e))
+ raise e
+
+ if not count:
+ return warn(e)
+ elif isinstance(e, ExtractorError):
+ e = remove_end(str_or_none(e.cause) or e.orig_msg, '.')
+ warn(f'{e}. Retrying{format_field(suffix, None, " %s")} ({count}/{retries})...')
+
+ delay = float_or_none(sleep_func(n=count - 1)) if callable(sleep_func) else sleep_func
+ if delay:
+ info(f'Sleeping {delay:.2f} seconds ...')
+ time.sleep(delay)
+
+
+def make_archive_id(ie, video_id):
+ ie_key = ie if isinstance(ie, str) else ie.ie_key()
+ return f'{ie_key.lower()} {video_id}'
+
+
+def truncate_string(s, left, right=0):
+ assert left > 3 and right >= 0
+ if s is None or len(s) <= left + right:
+ return s
+ return f'{s[:left-3]}...{s[-right:]}'
+
+
# Deprecated
has_certifi = bool(certifi)
has_websockets = bool(websockets)