-#!/usr/bin/env python3
+import asyncio
import atexit
import base64
import binascii
import hmac
import html.entities
import html.parser
+import http.client
+import http.cookiejar
import importlib.util
+import inspect
import io
import itertools
import json
import time
import traceback
import types
+import urllib.error
import urllib.parse
import urllib.request
import xml.etree.ElementTree
import zlib
-import http.client
-import http.cookiejar
-from .compat import asyncio, functools # isort: split
+from .compat import functools # isort: split
from .compat import (
compat_etree_fromstring,
compat_expanduser,
compat_HTMLParseError,
- compat_HTTPError,
compat_os_name,
- compat_parse_qs,
compat_shlex_quote,
- compat_str,
- compat_urllib_parse_urlencode,
- compat_urllib_parse_urlparse,
- compat_urlparse,
)
from .dependencies import brotli, certifi, websockets, xattr
from .socks import ProxyType, sockssocket
# In Python < 2.6.5, urlsplit() suffers from bug https://bugs.python.org/issue7904
# URLs with protocols not in urlparse.uses_netloc are not handled correctly
for scheme in ('socks', 'socks4', 'socks4a', 'socks5'):
- if scheme not in compat_urlparse.uses_netloc:
- compat_urlparse.uses_netloc.append(scheme)
+ if scheme not in urllib.parse.uses_netloc:
+ urllib.parse.uses_netloc.append(scheme)
# This is not clearly defined otherwise
'juillet', 'août', 'septembre', 'octobre', 'novembre', 'décembre'],
}
-KNOWN_EXTENSIONS = (
- 'mp4', 'm4a', 'm4p', 'm4b', 'm4r', 'm4v', 'aac',
- 'flv', 'f4v', 'f4a', 'f4b',
- 'webm', 'ogg', 'ogv', 'oga', 'ogx', 'spx', 'opus',
- 'mkv', 'mka', 'mk3d',
- 'avi', 'divx',
- 'mov',
- 'asf', 'wmv', 'wma',
- '3gp', '3g2',
- 'mp3',
- 'flac',
- 'ape',
- 'wav',
- 'f4f', 'f4m', 'm3u8', 'smil')
-
# needed for sanitizing filenames in restricted mode
ACCENT_CHARS = dict(zip('ÂÃÄÀÁÅÆÇÈÉÊËÌÍÎÏÐÑÒÓÔÕÖŐØŒÙÚÛÜŰÝÞßàáâãäåæçèéêëìíîïðñòóôõöőøœùúûüűýþÿ',
itertools.chain('AAAAAA', ['AE'], 'CEEEEIIIIDNOOOOOOO', ['OE'], 'UUUUUY', ['TH', 'ss'],
'%d/%m/%Y',
'%d/%m/%y',
'%d/%m/%Y %H:%M:%S',
+ '%d-%m-%Y %H:%M',
])
DATE_FORMATS_MONTH_FIRST = list(DATE_FORMATS)
])
PACKED_CODES_RE = r"}\('(.+)',(\d+),(\d+),'([^']+)'\.split\('\|'\)"
-JSON_LD_RE = r'(?is)<script[^>]+type=(["\']?)application/ld\+json\1[^>]*>(?P<json_ld>.+?)</script>'
+JSON_LD_RE = r'(?is)<script[^>]+type=(["\']?)application/ld\+json\1[^>]*>\s*(?P<json_ld>{.+?})\s*</script>'
NUMBER_RE = r'\d+(?:\.\d+)?'
def _find_xpath(xpath):
return node.find(xpath)
- if isinstance(xpath, (str, compat_str)):
+ if isinstance(xpath, str):
n = _find_xpath(xpath)
else:
for xp in xpath:
if filename == '-':
if sys.platform == 'win32':
import msvcrt
- msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
+ # stdout may be any IO stream. Eg, when using contextlib.redirect_stdout
+ with contextlib.suppress(io.UnsupportedOperation):
+ msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
return (sys.stdout.buffer if hasattr(sys.stdout, 'buffer') else sys.stdout, filename)
for attempt in range(2):
s = re.sub(r'[0-9]+(?::[0-9]+)+', lambda m: m.group(0).replace(':', '_'), s) # Handle timestamps
result = ''.join(map(replace_insane, s))
if is_id is NO_DEFAULT:
- result = re.sub('(\0.)(?:(?=\\1)..)+', r'\1', result) # Remove repeated substitute chars
- STRIP_RE = '(?:\0.|[ _-])*'
+ result = re.sub(r'(\0.)(?:(?=\1)..)+', r'\1', result) # Remove repeated substitute chars
+ STRIP_RE = r'(?:\0.|[ _-])*'
result = re.sub(f'^\0.{STRIP_RE}|{STRIP_RE}\0.$', '', result) # Remove substitute chars from start/end
result = result.replace('\0', '') or '_'
return os.path.join(*sanitized_path)
-def sanitize_url(url):
+def sanitize_url(url, *, scheme='http'):
# Prepend protocol-less URLs with `http:` scheme in order to mitigate
# the number of unwanted failures due to missing protocol
if url is None:
return
elif url.startswith('//'):
- return 'http:%s' % url
+ return f'{scheme}:{url}'
# Fix some common typos seen so far
COMMON_TYPOS = (
# https://github.com/ytdl-org/youtube-dl/issues/15649
def extract_basic_auth(url):
- parts = compat_urlparse.urlsplit(url)
+ parts = urllib.parse.urlsplit(url)
if parts.username is None:
return url, None
- url = compat_urlparse.urlunsplit(parts._replace(netloc=(
+ url = urllib.parse.urlunsplit(parts._replace(netloc=(
parts.hostname if parts.port is None
else '%s:%d' % (parts.hostname, parts.port))))
auth_payload = base64.b64encode(
def encodeArgument(s):
# Legacy code that uses byte strings
# Uncomment the following line after fixing all post processors
- # assert isinstance(s, str), 'Internal error: %r should be of type %r, is %r' % (s, compat_str, type(s))
+ # assert isinstance(s, str), 'Internal error: %r should be of type %r, is %r' % (s, str, type(s))
return s if isinstance(s, str) else s.decode('ascii')
if isinstance(optval, bytes):
optval = optval.decode(preferredencoding())
- assert isinstance(optval, compat_str)
+ assert isinstance(optval, str)
return optval
if opts_check_certificate:
if has_certifi and 'no-certifi' not in params.get('compat_opts', []):
context.load_verify_locations(cafile=certifi.where())
- try:
- context.load_default_certs()
- # Work around the issue in load_default_certs when there are bad certificates. See:
- # https://github.com/yt-dlp/yt-dlp/issues/1060,
- # https://bugs.python.org/issue35665, https://bugs.python.org/issue45312
- except ssl.SSLError:
- # enum_certificates is not present in mingw python. See https://github.com/yt-dlp/yt-dlp/issues/1151
- if sys.platform == 'win32' and hasattr(ssl, 'enum_certificates'):
- for storename in ('CA', 'ROOT'):
- _ssl_load_windows_store_certs(context, storename)
- context.set_default_verify_paths()
+ else:
+ try:
+ context.load_default_certs()
+ # Work around the issue in load_default_certs when there are bad certificates. See:
+ # https://github.com/yt-dlp/yt-dlp/issues/1060,
+ # https://bugs.python.org/issue35665, https://bugs.python.org/issue45312
+ except ssl.SSLError:
+ # enum_certificates is not present in mingw python. See https://github.com/yt-dlp/yt-dlp/issues/1151
+ if sys.platform == 'win32' and hasattr(ssl, 'enum_certificates'):
+ for storename in ('CA', 'ROOT'):
+ _ssl_load_windows_store_certs(context, storename)
+ context.set_default_verify_paths()
client_certfile = params.get('client_certificate')
if client_certfile:
self.countries = countries
+class UserNotLive(ExtractorError):
+ """Error when a channel/user is not live"""
+
+ def __init__(self, msg=None, **kwargs):
+ kwargs['expected'] = True
+ super().__init__(msg or 'The channel is not currently live', **kwargs)
+
+
class DownloadError(YoutubeDLError):
"""Download Error exception.
assert issubclass(base_class, (
http.client.HTTPConnection, http.client.HTTPSConnection))
- url_components = compat_urlparse.urlparse(socks_proxy)
+ url_components = urllib.parse.urlparse(socks_proxy)
if url_components.scheme.lower() == 'socks5':
socks_type = ProxyType.SOCKS5
elif url_components.scheme.lower() in ('socks', 'socks4'):
m = req.get_method()
if (not (code in (301, 302, 303, 307, 308) and m in ("GET", "HEAD")
or code in (301, 302, 303) and m == "POST")):
- raise compat_HTTPError(req.full_url, code, msg, headers, fp)
+ raise urllib.error.HTTPError(req.full_url, code, msg, headers, fp)
# Strictly (according to RFC 2616), 301 or 302 in response to
# a POST MUST NOT cause a redirection without confirmation
# from the user (of urllib.request, in this case). In practice,
with contextlib.suppress(ValueError):
upload_date = datetime.datetime(*timetuple[:6]).strftime('%Y%m%d')
if upload_date is not None:
- return compat_str(upload_date)
+ return str(upload_date)
def unified_timestamp(date_str, day_first=True):
def __str__(self):
return f'{self.start.isoformat()} - {self.end.isoformat()}'
+ def __eq__(self, other):
+ return (isinstance(other, DateRange)
+ and self.start == other.start and self.end == other.end)
+
def platform_name():
- """ Returns the platform name as a compat_str """
- res = platform.platform()
- if isinstance(res, bytes):
- res = res.decode(preferredencoding())
+ """ Returns the platform name as a str """
+ write_string('DeprecationWarning: yt_dlp.utils.platform_name is deprecated, use platform.platform instead')
+ return platform.platform()
+
- assert isinstance(res, compat_str)
- return res
+@functools.cache
+def system_identifier():
+ python_implementation = platform.python_implementation()
+ if python_implementation == 'PyPy' and hasattr(sys, 'pypy_version_info'):
+ python_implementation += ' version %d.%d.%d' % sys.pypy_version_info[:3]
+
+ return 'Python %s (%s %s) - %s %s' % (
+ platform.python_version(),
+ python_implementation,
+ platform.architecture()[0],
+ platform.platform(),
+ format_field(join_nonempty(*platform.libc_ver(), delim=' '), None, '(%s)'),
+ )
@functools.cache
url, idata = unsmuggle_url(url, {})
data.update(idata)
- sdata = compat_urllib_parse_urlencode(
+ sdata = urllib.parse.urlencode(
{'__youtubedl_smuggle': json.dumps(data)})
return url + '#' + sdata
if '#__youtubedl_smuggle' not in smug_url:
return smug_url, default
url, _, sdata = smug_url.rpartition('#')
- jsond = compat_parse_qs(sdata)['__youtubedl_smuggle'][0]
+ jsond = urllib.parse.parse_qs(sdata)['__youtubedl_smuggle'][0]
data = json.loads(jsond)
return url, data
def parse_bitrate(s):
- if not isinstance(s, compat_str):
+ if not isinstance(s, str):
return
mobj = re.search(r'\b(\d+)\s*kbps', s)
if mobj:
def setproctitle(title):
- assert isinstance(title, compat_str)
+ assert isinstance(title, str)
# ctypes in Jython is not complete
# http://bugs.jython.org/issue2148
def get_domain(url):
- domain = re.match(r'(?:https?:\/\/)?(?:www\.)?(?P<domain>[^\n\/]+\.[^\n\/]+)(?:\/(.*))?', url)
- return domain.group('domain') if domain else None
+ """
+ This implementation is inconsistent, but is kept for compatibility.
+ Use this only for "webpage_url_domain"
+ """
+ return remove_start(urllib.parse.urlparse(url).netloc, 'www.') or None
def url_basename(url):
- path = compat_urlparse.urlparse(url).path
+ path = urllib.parse.urlparse(url).path
return path.strip('/').split('/')[-1]
def urljoin(base, path):
if isinstance(path, bytes):
path = path.decode()
- if not isinstance(path, compat_str) or not path:
+ if not isinstance(path, str) or not path:
return None
if re.match(r'^(?:[a-zA-Z][a-zA-Z0-9+-.]*:)?//', path):
return path
if isinstance(base, bytes):
base = base.decode()
- if not isinstance(base, compat_str) or not re.match(
+ if not isinstance(base, str) or not re.match(
r'^(?:https?:)?//', base):
return None
- return compat_urlparse.urljoin(base, path)
+ return urllib.parse.urljoin(base, path)
class HEADRequest(urllib.request.Request):
def str_or_none(v, default=None):
- return default if v is None else compat_str(v)
+ return default if v is None else str(v)
def str_to_int(int_str):
""" A more relaxed version of int_or_none """
if isinstance(int_str, int):
return int_str
- elif isinstance(int_str, compat_str):
+ elif isinstance(int_str, str):
int_str = re.sub(r'[,\.\+]', '', int_str)
return int_or_none(int_str)
def strip_or_none(v, default=None):
- return v.strip() if isinstance(v, compat_str) else default
+ return v.strip() if isinstance(v, str) else default
def url_or_none(url):
- if not url or not isinstance(url, compat_str):
+ if not url or not isinstance(url, str):
return None
url = url.strip()
return url if re.match(r'^(?:(?:https?|rt(?:m(?:pt?[es]?|fp)|sp[su]?)|mms|ftps?):)?//', url) else None
try:
if isinstance(timestamp, (int, float)): # unix timestamp
datetime_object = datetime.datetime.utcfromtimestamp(timestamp)
- elif isinstance(timestamp, compat_str): # assume YYYYMMDD
+ elif isinstance(timestamp, str): # assume YYYYMMDD
datetime_object = datetime.datetime.strptime(timestamp, '%Y%m%d')
return datetime_object.strftime(date_format)
except (ValueError, TypeError, AttributeError):
def detect_exe_version(output, version_re=None, unrecognized='present'):
- assert isinstance(output, compat_str)
+ assert isinstance(output, str)
if version_re is None:
version_re = r'version\s+([-0-9._a-zA-Z]+)'
m = re.search(version_re, output)
@staticmethod
def _reverse_index(x):
- return None if x is None else -(x + 1)
+ return None if x is None else ~x
def __getitem__(self, idx):
if isinstance(idx, slice):
def escape_url(url):
"""Escape URL as suggested by RFC 3986"""
- url_parsed = compat_urllib_parse_urlparse(url)
+ url_parsed = urllib.parse.urlparse(url)
return url_parsed._replace(
netloc=url_parsed.netloc.encode('idna').decode('ascii'),
path=escape_rfc3986(url_parsed.path),
def parse_qs(url):
- return compat_parse_qs(compat_urllib_parse_urlparse(url).query)
+ return urllib.parse.parse_qs(urllib.parse.urlparse(url).query)
def read_batch_urls(batch_fd):
def fixup(url):
- if not isinstance(url, compat_str):
+ if not isinstance(url, str):
url = url.decode('utf-8', 'replace')
BOM_UTF8 = ('\xef\xbb\xbf', '\ufeff')
for bom in BOM_UTF8:
if not url or url.startswith(('#', ';', ']')):
return False
# "#" cannot be stripped out since it is part of the URI
- # However, it can be safely stipped out if follwing a whitespace
+ # However, it can be safely stripped out if following a whitespace
return re.split(r'\s#', url, 1)[0].rstrip()
with contextlib.closing(batch_fd) as fd:
def urlencode_postdata(*args, **kargs):
- return compat_urllib_parse_urlencode(*args, **kargs).encode('ascii')
+ return urllib.parse.urlencode(*args, **kargs).encode('ascii')
def update_url_query(url, query):
if not query:
return url
- parsed_url = compat_urlparse.urlparse(url)
- qs = compat_parse_qs(parsed_url.query)
+ parsed_url = urllib.parse.urlparse(url)
+ qs = urllib.parse.parse_qs(parsed_url.query)
qs.update(query)
- return compat_urlparse.urlunparse(parsed_url._replace(
- query=compat_urllib_parse_urlencode(qs, True)))
+ return urllib.parse.urlunparse(parsed_url._replace(
+ query=urllib.parse.urlencode(qs, True)))
-def update_Request(req, url=None, data=None, headers={}, query={}):
+def update_Request(req, url=None, data=None, headers=None, query=None):
req_headers = req.headers.copy()
- req_headers.update(headers)
+ req_headers.update(headers or {})
req_data = data or req.data
req_url = update_url_query(url or req.get_full_url(), query)
req_get_method = req.get_method()
out = b''
for k, v in data.items():
out += b'--' + boundary.encode('ascii') + b'\r\n'
- if isinstance(k, compat_str):
+ if isinstance(k, str):
k = k.encode()
- if isinstance(v, compat_str):
+ if isinstance(v, str):
v = v.encode()
# RFC 2047 requires non-ASCII field names to be encoded, while RFC 7578
# suggests sending UTF-8 directly. Firefox sends UTF-8, too
def encode_compat_str(string, encoding=preferredencoding(), errors='strict'):
- return string if isinstance(string, compat_str) else compat_str(string, encoding, errors)
+ return string if isinstance(string, str) else str(string, encoding, errors)
US_RATINGS = {
str.strip, codecs_str.strip().strip(',').split(','))))
vcodec, acodec, scodec, hdr = None, None, None, None
for full_codec in split_codecs:
- parts = full_codec.split('.')
- codec = parts[0].replace('0', '')
- if codec in ('avc1', 'avc2', 'avc3', 'avc4', 'vp9', 'vp8', 'hev1', 'hev2',
- 'h263', 'h264', 'mp4v', 'hvc1', 'av1', 'theora', 'dvh1', 'dvhe'):
- if not vcodec:
- vcodec = '.'.join(parts[:4]) if codec in ('vp9', 'av1', 'hvc1') else full_codec
- if codec in ('dvh1', 'dvhe'):
- hdr = 'DV'
- elif codec == 'av1' and len(parts) > 3 and parts[3] == '10':
- hdr = 'HDR10'
- elif full_codec.replace('0', '').startswith('vp9.2'):
- hdr = 'HDR10'
- elif codec in ('flac', 'mp4a', 'opus', 'vorbis', 'mp3', 'aac', 'ac-3', 'ec-3', 'eac3', 'dtsc', 'dtse', 'dtsh', 'dtsl'):
- if not acodec:
- acodec = full_codec
- elif codec in ('stpp', 'wvtt',):
- if not scodec:
- scodec = full_codec
+ parts = re.sub(r'0+(?=\d)', '', full_codec).split('.')
+ if parts[0] in ('avc1', 'avc2', 'avc3', 'avc4', 'vp9', 'vp8', 'hev1', 'hev2',
+ 'h263', 'h264', 'mp4v', 'hvc1', 'av1', 'theora', 'dvh1', 'dvhe'):
+ if vcodec:
+ continue
+ vcodec = full_codec
+ if parts[0] in ('dvh1', 'dvhe'):
+ hdr = 'DV'
+ elif parts[0] == 'av1' and traverse_obj(parts, 3) == '10':
+ hdr = 'HDR10'
+ elif parts[:2] == ['vp9', '2']:
+ hdr = 'HDR10'
+ elif parts[0] in ('flac', 'mp4a', 'opus', 'vorbis', 'mp3', 'aac',
+ 'ac-3', 'ec-3', 'eac3', 'dtsc', 'dtse', 'dtsh', 'dtsl'):
+ acodec = acodec or full_codec
+ elif parts[0] in ('stpp', 'wvtt'):
+ scodec = scodec or full_codec
else:
write_string(f'WARNING: Unknown codec {full_codec}\n')
if vcodec or acodec or scodec:
return age_limit < content_limit
+# List of known byte-order-marks (BOM)
+BOMS = [
+ (b'\xef\xbb\xbf', 'utf-8'),
+ (b'\x00\x00\xfe\xff', 'utf-32-be'),
+ (b'\xff\xfe\x00\x00', 'utf-32-le'),
+ (b'\xff\xfe', 'utf-16-le'),
+ (b'\xfe\xff', 'utf-16-be'),
+]
+
+
def is_html(first_bytes):
""" Detect whether a file contains HTML by examining its first bytes. """
- BOMS = [
- (b'\xef\xbb\xbf', 'utf-8'),
- (b'\x00\x00\xfe\xff', 'utf-32-be'),
- (b'\xff\xfe\x00\x00', 'utf-32-le'),
- (b'\xff\xfe', 'utf-16-le'),
- (b'\xfe\xff', 'utf-16-be'),
- ]
-
encoding = 'utf-8'
for bom, enc in BOMS:
while first_bytes.startswith(bom):
elif ext == 'f4m':
return 'f4m'
- return compat_urllib_parse_urlparse(url).scheme
+ return urllib.parse.urlparse(url).scheme
def render_table(header_row, data, delim=False, extra_gap=0, hide_empty=False):
if not filters or any(match_str(f, info_dict, incomplete) for f in filters):
return NO_DEFAULT if interactive and not incomplete else None
else:
- video_title = info_dict.get('title') or info_dict.get('id') or 'video'
+ video_title = info_dict.get('title') or info_dict.get('id') or 'entry'
filter_str = ') | ('.join(map(str.strip, filters))
return f'{video_title} does not pass filter ({filter_str}), skipping ..'
return _match_func
-def download_range_func(chapters, ranges):
- def inner(info_dict, ydl):
+class download_range_func:
+ def __init__(self, chapters, ranges):
+ self.chapters, self.ranges = chapters, ranges
+
+ def __call__(self, info_dict, ydl):
warning = ('There are no chapters matching the regex' if info_dict.get('chapters')
else 'Cannot match chapters since chapter information is unavailable')
- for regex in chapters or []:
+ for regex in self.chapters or []:
for i, chapter in enumerate(info_dict.get('chapters') or []):
if re.search(regex, chapter['title']):
warning = None
yield {**chapter, 'index': i}
- if chapters and warning:
+ if self.chapters and warning:
ydl.to_screen(f'[info] {info_dict["id"]}: {warning}')
- yield from ({'start_time': start, 'end_time': end} for start, end in ranges or [])
+ yield from ({'start_time': start, 'end_time': end} for start, end in self.ranges or [])
- return inner
+ def __eq__(self, other):
+ return (isinstance(other, download_range_func)
+ and self.chapters == other.chapters and self.ranges == other.ranges)
def parse_dfxp_time_expr(time_expr):
addr, preflen = block.split('/')
addr_min = struct.unpack('!L', socket.inet_aton(addr))[0]
addr_max = addr_min | (0xffffffff >> int(preflen))
- return compat_str(socket.inet_ntoa(
+ return str(socket.inet_ntoa(
struct.pack('!L', random.randint(addr_min, addr_max))))
if proxy == '__noproxy__':
return None # No Proxy
- if compat_urlparse.urlparse(proxy).scheme.lower() in ('socks', 'socks4', 'socks4a', 'socks5'):
+ if urllib.parse.urlparse(proxy).scheme.lower() in ('socks', 'socks4', 'socks4a', 'socks5'):
req.add_header('Ytdl-socks-proxy', proxy)
# yt-dlp's http/https handlers do wrapping the socket with socks
return None
raise ValueError('Either table or n must be specified')
table = (table or '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ')[:n]
- if n != len(table):
+ if n and n != len(table):
raise ValueError(f'base {n} exceeds table length {len(table)}')
return table
The function doesn't add an additional layer of escaping; e.g., it doesn't escape `%3C` as `%253C`. Instead, it percent-escapes characters with an underlying UTF-8 encoding *besides* those already escaped, leaving the URI intact.
"""
- iri_parts = compat_urllib_parse_urlparse(iri)
+ iri_parts = urllib.parse.urlparse(iri)
if '[' in iri_parts.netloc:
raise ValueError('IPv6 URIs are not, yet, supported.')
return sys.stdin
+def determine_file_encoding(data):
+ """
+ Detect the text encoding used
+ @returns (encoding, bytes to skip)
+ """
+
+ # BOM marks are given priority over declarations
+ for bom, enc in BOMS:
+ if data.startswith(bom):
+ return enc, len(bom)
+
+ # Strip off all null bytes to match even when UTF-16 or UTF-32 is used.
+ # We ignore the endianness to get a good enough match
+ data = data.replace(b'\0', b'')
+ mobj = re.match(rb'(?m)^#\s*coding\s*:\s*(\S+)\s*$', data)
+ return mobj.group(1).decode() if mobj else None, 0
+
+
class Config:
own_args = None
parsed_args = None
def init(self, args=None, filename=None):
assert not self.__initialized
+ self.own_args, self.filename = args, filename
+ return self.load_configs()
+
+ def load_configs(self):
directory = ''
- if filename:
- location = os.path.realpath(filename)
+ if self.filename:
+ location = os.path.realpath(self.filename)
directory = os.path.dirname(location)
if location in self._loaded_paths:
return False
self._loaded_paths.add(location)
- self.own_args, self.__initialized = args, True
- opts, _ = self.parser.parse_known_args(args)
- self.parsed_args, self.filename = args, filename
-
+ self.__initialized = True
+ opts, _ = self.parser.parse_known_args(self.own_args)
+ self.parsed_args = self.own_args
for location in opts.config_locations or []:
if location == '-':
self.append_config(shlex.split(read_stdin('options'), comments=True), label='stdin')
@staticmethod
def read_file(filename, default=[]):
try:
- optionf = open(filename)
+ optionf = open(filename, 'rb')
except OSError:
return default # silently skip if file is not present
+ try:
+ enc, skip = determine_file_encoding(optionf.read(512))
+ optionf.seek(skip, io.SEEK_SET)
+ except OSError:
+ enc = None # silently skip read errors
try:
# FIXME: https://github.com/ytdl-org/youtube-dl/commit/dfe5fa49aed02cf36ba9f743b11b0903554b5e56
- contents = optionf.read()
+ contents = optionf.read().decode(enc or preferredencoding())
res = shlex.split(contents, comments=True)
except Exception as err:
raise ValueError(f'Unable to parse "{filename}": {err}')
return {k.title(): v for k, v in itertools.chain.from_iterable(map(dict.items, dicts))}
+def cached_method(f):
+ """Cache a method"""
+ signature = inspect.signature(f)
+
+ @functools.wraps(f)
+ def wrapper(self, *args, **kwargs):
+ bound_args = signature.bind(self, *args, **kwargs)
+ bound_args.apply_defaults()
+ key = tuple(bound_args.arguments.values())
+
+ if not hasattr(self, '__cached_method__cache'):
+ self.__cached_method__cache = {}
+ cache = self.__cached_method__cache.setdefault(f.__name__, {})
+ if key not in cache:
+ cache[key] = f(self, *args, **kwargs)
+ return cache[key]
+ return wrapper
+
+
class classproperty:
- """classmethod(property(func)) that works in py < 3.9"""
+ """property access for class methods"""
def __init__(self, func):
functools.update_wrapper(self, func)
return self.__dict__.items()
+MEDIA_EXTENSIONS = Namespace(
+ common_video=('avi', 'flv', 'mkv', 'mov', 'mp4', 'webm'),
+ video=('3g2', '3gp', 'f4v', 'mk3d', 'divx', 'mpg', 'ogv', 'm4v', 'wmv'),
+ common_audio=('aiff', 'alac', 'flac', 'm4a', 'mka', 'mp3', 'ogg', 'opus', 'wav'),
+ audio=('aac', 'ape', 'asf', 'f4a', 'f4b', 'm4b', 'm4p', 'm4r', 'oga', 'ogx', 'spx', 'vorbis', 'wma'),
+ thumbnails=('jpg', 'png', 'webp'),
+ storyboards=('mhtml', ),
+ subtitles=('srt', 'vtt', 'ass', 'lrc'),
+ manifests=('f4f', 'f4m', 'm3u8', 'smil', 'mpd'),
+)
+MEDIA_EXTENSIONS.video += MEDIA_EXTENSIONS.common_video
+MEDIA_EXTENSIONS.audio += MEDIA_EXTENSIONS.common_audio
+
+KNOWN_EXTENSIONS = (*MEDIA_EXTENSIONS.video, *MEDIA_EXTENSIONS.audio, *MEDIA_EXTENSIONS.manifests)
+
+
# Deprecated
has_certifi = bool(certifi)
has_websockets = bool(websockets)