import tempfile
import time
import traceback
+import types
import urllib.parse
import xml.etree.ElementTree
import zlib
NO_DEFAULT = object()
+IDENTITY = lambda x: x
ENGLISH_MONTH_NAMES = [
'January', 'February', 'March', 'April', 'May', 'June',
def get_elements_by_class(class_name, html, **kargs):
"""Return the content of all tags with the specified class in the passed HTML document as a list"""
return get_elements_by_attribute(
- 'class', r'[^\'"]*\b%s\b[^\'"]*' % re.escape(class_name),
+ 'class', r'[^\'"]*(?<=[\'"\s])%s(?=[\'"\s])[^\'"]*' % re.escape(class_name),
html, escape_value=False)
def get_elements_html_by_class(class_name, html):
"""Return the html of all tags with the specified class in the passed HTML document as a list"""
return get_elements_html_by_attribute(
- 'class', r'[^\'"]*\b%s\b[^\'"]*' % re.escape(class_name),
+ 'class', r'[^\'"]*(?<=[\'"\s])%s(?=[\'"\s])[^\'"]*' % re.escape(class_name),
html, escape_value=False)
return html.strip()
+class LenientJSONDecoder(json.JSONDecoder):
+ def __init__(self, *args, transform_source=None, ignore_extra=False, **kwargs):
+ self.transform_source, self.ignore_extra = transform_source, ignore_extra
+ super().__init__(*args, **kwargs)
+
+ def decode(self, s):
+ if self.transform_source:
+ s = self.transform_source(s)
+ if self.ignore_extra:
+ return self.raw_decode(s.lstrip())[0]
+ return super().decode(s)
+
+
def sanitize_open(filename, open_mode):
"""Try to open the given filename, and slightly tweak it if this fails.
# Ref: https://github.com/yt-dlp/yt-dlp/issues/3124
raise LockingUnsupportedError()
stream = locked_file(filename, open_mode, block=False).__enter__()
- except LockingUnsupportedError:
+ except OSError:
stream = open(filename, open_mode)
- return (stream, filename)
+ return stream, filename
except OSError as err:
if attempt or err.errno in (errno.EACCES,):
raise
return os.path.expandvars(compat_expanduser(s))
-def orderedSet(iterable):
- """ Remove all duplicates from the input iterable """
- res = []
- for el in iterable:
- if el not in res:
- res.append(el)
- return res
+def orderedSet(iterable, *, lazy=False):
+ """Remove all duplicates from the input iterable"""
+ def _iter():
+ seen = [] # Do not use set since the items can be unhashable
+ for x in iterable:
+ if x not in seen:
+ seen.append(x)
+ yield x
+
+ return _iter() if lazy else list(_iter())
def _htmlentity_transform(entity_with_semicolon):
def process_communicate_or_kill(p, *args, **kwargs):
- try:
- return p.communicate(*args, **kwargs)
- except BaseException: # Including KeyboardInterrupt
- p.kill()
- p.wait()
- raise
+ write_string('DeprecationWarning: yt_dlp.utils.process_communicate_or_kill is deprecated '
+ 'and may be removed in a future version. Use yt_dlp.utils.Popen.communicate_or_kill instead')
+ return Popen.communicate_or_kill(p, *args, **kwargs)
class Popen(subprocess.Popen):
else:
_startupinfo = None
- def __init__(self, *args, **kwargs):
+ def __init__(self, *args, text=False, **kwargs):
+ if text is True:
+ kwargs['universal_newlines'] = True # For 3.6 compatibility
+ kwargs.setdefault('encoding', 'utf-8')
+ kwargs.setdefault('errors', 'replace')
super().__init__(*args, **kwargs, startupinfo=self._startupinfo)
def communicate_or_kill(self, *args, **kwargs):
- return process_communicate_or_kill(self, *args, **kwargs)
+ try:
+ return self.communicate(*args, **kwargs)
+ except BaseException: # Including KeyboardInterrupt
+ self.kill(timeout=None)
+ raise
+
+ def kill(self, *, timeout=0):
+ super().kill()
+ if timeout != 0:
+ self.wait(timeout=timeout)
+
+ @classmethod
+ def run(cls, *args, **kwargs):
+ with cls(*args, **kwargs) as proc:
+ stdout, stderr = proc.communicate_or_kill()
+ return stdout or '', stderr or '', proc.returncode
def get_subprocess_encoding():
context.options |= 4 # SSL_OP_LEGACY_SERVER_CONNECT
# Allow use of weaker ciphers in Python 3.10+. See https://bugs.python.org/issue43998
context.set_ciphers('DEFAULT')
+
context.verify_mode = ssl.CERT_REQUIRED if opts_check_certificate else ssl.CERT_NONE
if opts_check_certificate:
if has_certifi and 'no-certifi' not in params.get('compat_opts', []):
context.load_verify_locations(cafile=certifi.where())
- else:
- try:
- context.load_default_certs()
- # Work around the issue in load_default_certs when there are bad certificates. See:
- # https://github.com/yt-dlp/yt-dlp/issues/1060,
- # https://bugs.python.org/issue35665, https://bugs.python.org/issue45312
- except ssl.SSLError:
- # enum_certificates is not present in mingw python. See https://github.com/yt-dlp/yt-dlp/issues/1151
- if sys.platform == 'win32' and hasattr(ssl, 'enum_certificates'):
- for storename in ('CA', 'ROOT'):
- _ssl_load_windows_store_certs(context, storename)
- context.set_default_verify_paths()
+ try:
+ context.load_default_certs()
+ # Work around the issue in load_default_certs when there are bad certificates. See:
+ # https://github.com/yt-dlp/yt-dlp/issues/1060,
+ # https://bugs.python.org/issue35665, https://bugs.python.org/issue45312
+ except ssl.SSLError:
+ # enum_certificates is not present in mingw python. See https://github.com/yt-dlp/yt-dlp/issues/1151
+ if sys.platform == 'win32' and hasattr(ssl, 'enum_certificates'):
+ for storename in ('CA', 'ROOT'):
+ _ssl_load_windows_store_certs(context, storename)
+ context.set_default_verify_paths()
+
client_certfile = params.get('client_certificate')
if client_certfile:
try:
password=params.get('client_certificate_password'))
except ssl.SSLError:
raise YoutubeDLError('Unable to load client certificate')
+
+ # Some servers may reject requests if ALPN extension is not sent. See:
+ # https://github.com/python/cpython/issues/85140
+ # https://github.com/yt-dlp/yt-dlp/issues/3878
+ with contextlib.suppress(NotImplementedError):
+ context.set_alpn_protocols(['http/1.1'])
+
return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
def bug_reports_message(before=';'):
- msg = ('please report this issue on https://github.com/yt-dlp/yt-dlp/issues?q= , '
- 'filling out the appropriate issue template. '
- 'Confirm you are on the latest version using yt-dlp -U')
+ from .update import REPOSITORY
+
+ msg = (f'please report this issue on https://github.com/{REPOSITORY}/issues?q= , '
+ 'filling out the appropriate issue template. Confirm you are on the latest version using yt-dlp -U')
before = before.rstrip()
if not before or before.endswith(('.', '!', '?')):
self.video_id = video_id
self.ie = ie
self.exc_info = sys.exc_info() # preserve original exception
+ if isinstance(self.exc_info[1], ExtractorError):
+ self.exc_info = self.exc_info[1].exc_info
super().__init__(''.join((
- format_field(ie, template='[%s] '),
- format_field(video_id, template='%s: '),
+ format_field(ie, None, '[%s] '),
+ format_field(video_id, None, '%s: '),
msg,
- format_field(cause, template=' (caused by %r)'),
+ format_field(cause, None, ' (caused by %r)'),
'' if expected else bug_reports_message())))
def format_traceback(self):
req.headers = handle_youtubedl_headers(req.headers)
- return req
+ return super().do_request_(req)
def http_response(self, req, resp):
old_resp = resp
@functools.cache
def get_windows_version():
- ''' Get Windows version. None if it's not running on Windows '''
+ ''' Get Windows version. returns () if it's not running on Windows '''
if compat_os_name == 'nt':
return version_tuple(platform.win32_ver()[1])
else:
- return None
+ return ()
def write_string(s, out=None, encoding=None):
if compat_os_name == 'nt' and supports_terminal_sequences(out):
s = re.sub(r'([\r\n]+)', r' \1', s)
- enc = None
+ enc, buffer = None, out
if 'b' in getattr(out, 'mode', ''):
enc = encoding or preferredencoding()
elif hasattr(out, 'buffer'):
- out = out.buffer
+ buffer = out.buffer
enc = encoding or getattr(out, 'encoding', None) or preferredencoding()
- out.write(s.encode(enc, 'ignore') if enc else s)
+ buffer.write(s.encode(enc, 'ignore') if enc else s)
out.flush()
return compat_struct_pack('%dB' % len(xs), *xs)
-class LockingUnsupportedError(IOError):
- msg = 'File locking is not supported on this platform'
+class LockingUnsupportedError(OSError):
+ msg = 'File locking is not supported'
def __init__(self):
super().__init__(self.msg)
if not LockFileEx(msvcrt.get_osfhandle(f.fileno()),
(0x2 if exclusive else 0x0) | (0x0 if block else 0x1),
0, whole_low, whole_high, f._lock_file_overlapped_p):
- raise BlockingIOError('Locking file failed: %r' % ctypes.FormatError())
+ # NB: No argument form of "ctypes.FormatError" does not work on PyPy
+ raise BlockingIOError(f'Locking file failed: {ctypes.FormatError(ctypes.GetLastError())!r}')
def _unlock_file(f):
assert f._lock_file_overlapped_p
try:
self.f.truncate()
except OSError as e:
- if e.errno != 29: # Illegal seek, expected when self.f is a FIFO
- raise e
+ if e.errno not in (
+ errno.ESPIPE, # Illegal seek - expected for FIFO
+ errno.EINVAL, # Invalid argument - expected for /dev/null
+ ):
+ raise
return self
def unlock(self):
""" Checks if the given binary is installed somewhere in PATH, and returns its name.
args can be a list of arguments for a short output (like -version) """
try:
- Popen([exe] + args, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate_or_kill()
+ Popen.run([exe] + args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except OSError:
return False
return exe
# STDIN should be redirected too. On UNIX-like systems, ffmpeg triggers
# SIGTTOU if yt-dlp is run in the background.
# See https://github.com/ytdl-org/youtube-dl/issues/955#issuecomment-209789656
- out, _ = Popen(
- [encodeArgument(exe)] + args, stdin=subprocess.PIPE,
- stdout=subprocess.PIPE, stderr=subprocess.STDOUT).communicate_or_kill()
+ stdout, _, _ = Popen.run([encodeArgument(exe)] + args, text=True,
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
except OSError:
return False
- if isinstance(out, bytes): # Python 2.x
- out = out.decode('ascii', 'ignore')
- return out
+ return stdout
def detect_exe_version(output, version_re=None, unrecognized='present'):
return detect_exe_version(out, version_re, unrecognized) if out else False
+def frange(start=0, stop=None, step=1):
+ """Float range"""
+ if stop is None:
+ start, stop = 0, start
+ sign = [-1, 1][step > 0] if step else 0
+ while sign * start < sign * stop:
+ yield start
+ start += step
+
+
class LazyList(collections.abc.Sequence):
"""Lazy immutable list from an iterable
Note that slices of a LazyList are lists and not LazyList"""
yield from page_results
+class PlaylistEntries:
+ MissingEntry = object()
+ is_exhausted = False
+
+ def __init__(self, ydl, info_dict):
+ self.ydl = ydl
+
+ # _entries must be assigned now since infodict can change during iteration
+ entries = info_dict.get('entries')
+ if entries is None:
+ raise EntryNotInPlaylist('There are no entries')
+ elif isinstance(entries, list):
+ self.is_exhausted = True
+
+ requested_entries = info_dict.get('requested_entries')
+ self.is_incomplete = bool(requested_entries)
+ if self.is_incomplete:
+ assert self.is_exhausted
+ self._entries = [self.MissingEntry] * max(requested_entries)
+ for i, entry in zip(requested_entries, entries):
+ self._entries[i - 1] = entry
+ elif isinstance(entries, (list, PagedList, LazyList)):
+ self._entries = entries
+ else:
+ self._entries = LazyList(entries)
+
+ PLAYLIST_ITEMS_RE = re.compile(r'''(?x)
+ (?P<start>[+-]?\d+)?
+ (?P<range>[:-]
+ (?P<end>[+-]?\d+|inf(?:inite)?)?
+ (?::(?P<step>[+-]?\d+))?
+ )?''')
+
+ @classmethod
+ def parse_playlist_items(cls, string):
+ for segment in string.split(','):
+ if not segment:
+ raise ValueError('There is two or more consecutive commas')
+ mobj = cls.PLAYLIST_ITEMS_RE.fullmatch(segment)
+ if not mobj:
+ raise ValueError(f'{segment!r} is not a valid specification')
+ start, end, step, has_range = mobj.group('start', 'end', 'step', 'range')
+ if int_or_none(step) == 0:
+ raise ValueError(f'Step in {segment!r} cannot be zero')
+ yield slice(int_or_none(start), float_or_none(end), int_or_none(step)) if has_range else int(start)
+
+ def get_requested_items(self):
+ playlist_items = self.ydl.params.get('playlist_items')
+ playlist_start = self.ydl.params.get('playliststart', 1)
+ playlist_end = self.ydl.params.get('playlistend')
+ # For backwards compatibility, interpret -1 as whole list
+ if playlist_end in (-1, None):
+ playlist_end = ''
+ if not playlist_items:
+ playlist_items = f'{playlist_start}:{playlist_end}'
+ elif playlist_start != 1 or playlist_end:
+ self.ydl.report_warning('Ignoring playliststart and playlistend because playlistitems was given', only_once=True)
+
+ for index in self.parse_playlist_items(playlist_items):
+ for i, entry in self[index]:
+ yield i, entry
+ if not entry:
+ continue
+ try:
+ # TODO: Add auto-generated fields
+ self.ydl._match_entry(entry, incomplete=True, silent=True)
+ except (ExistingVideoReached, RejectedVideoReached):
+ return
+
+ def get_full_count(self):
+ if self.is_exhausted and not self.is_incomplete:
+ return len(self)
+ elif isinstance(self._entries, InAdvancePagedList):
+ if self._entries._pagesize == 1:
+ return self._entries._pagecount
+
+ @functools.cached_property
+ def _getter(self):
+ if isinstance(self._entries, list):
+ def get_entry(i):
+ try:
+ entry = self._entries[i]
+ except IndexError:
+ entry = self.MissingEntry
+ if not self.is_incomplete:
+ raise self.IndexError()
+ if entry is self.MissingEntry:
+ raise EntryNotInPlaylist(f'Entry {i} cannot be found')
+ return entry
+ else:
+ def get_entry(i):
+ try:
+ return type(self.ydl)._handle_extraction_exceptions(lambda _, i: self._entries[i])(self.ydl, i)
+ except (LazyList.IndexError, PagedList.IndexError):
+ raise self.IndexError()
+ return get_entry
+
+ def __getitem__(self, idx):
+ if isinstance(idx, int):
+ idx = slice(idx, idx)
+
+ # NB: PlaylistEntries[1:10] => (0, 1, ... 9)
+ step = 1 if idx.step is None else idx.step
+ if idx.start is None:
+ start = 0 if step > 0 else len(self) - 1
+ else:
+ start = idx.start - 1 if idx.start >= 0 else len(self) + idx.start
+
+ # NB: Do not call len(self) when idx == [:]
+ if idx.stop is None:
+ stop = 0 if step < 0 else float('inf')
+ else:
+ stop = idx.stop - 1 if idx.stop >= 0 else len(self) + idx.stop
+ stop += [-1, 1][step > 0]
+
+ for i in frange(start, stop, step):
+ if i < 0:
+ continue
+ try:
+ entry = self._getter(i)
+ except self.IndexError:
+ self.is_exhausted = True
+ if step > 0:
+ break
+ continue
+ yield i + 1, entry
+
+ def __len__(self):
+ return len(tuple(self[:]))
+
+ class IndexError(IndexError):
+ pass
+
+
def uppercase_escape(s):
unicode_escape = codecs.getdecoder('unicode_escape')
return re.sub(
return '"%s"' % v
+ def create_map(mobj):
+ return json.dumps(dict(json.loads(js_to_json(mobj.group(1) or '[]', vars=vars))))
+
code = re.sub(r'new Date\((".+")\)', r'\g<1>', code)
+ code = re.sub(r'new Map\((\[.*?\])?\)', create_map, code)
return re.sub(r'''(?sx)
"(?:[^"\\]*(?:\\\\|\\['"nurtbfx/\n]))*[^"\\]*"|
return q
-POSTPROCESS_WHEN = ('pre_process', 'after_filter', 'before_dl', 'after_move', 'post_process', 'after_video', 'playlist')
+POSTPROCESS_WHEN = ('pre_process', 'after_filter', 'before_dl', 'post_process', 'after_move', 'after_video', 'playlist')
DEFAULT_OUTTMPL = {
else:
is_incomplete = lambda k: k in incomplete
- operator_rex = re.compile(r'''(?x)\s*
+ operator_rex = re.compile(r'''(?x)
(?P<key>[a-z_]+)
\s*(?P<negation>!\s*)?(?P<op>%s)(?P<none_inclusive>\s*\?)?\s*
(?:
(?P<quote>["\'])(?P<quotedstrval>.+?)(?P=quote)|
(?P<strval>.+?)
)
- \s*$
''' % '|'.join(map(re.escape, COMPARISON_OPERATORS.keys())))
- m = operator_rex.search(filter_part)
+ m = operator_rex.fullmatch(filter_part.strip())
if m:
m = m.groupdict()
unnegated_op = COMPARISON_OPERATORS[m['op']]
'': lambda v: (v is True) if isinstance(v, bool) else (v is not None),
'!': lambda v: (v is False) if isinstance(v, bool) else (v is None),
}
- operator_rex = re.compile(r'''(?x)\s*
+ operator_rex = re.compile(r'''(?x)
(?P<op>%s)\s*(?P<key>[a-z_]+)
- \s*$
''' % '|'.join(map(re.escape, UNARY_OPERATORS.keys())))
- m = operator_rex.search(filter_part)
+ m = operator_rex.fullmatch(filter_part.strip())
if m:
op = UNARY_OPERATORS[m.group('op')]
actual_value = dct.get(m.group('key'))
return _match_func
+def download_range_func(chapters, ranges):
+ def inner(info_dict, ydl):
+ warning = ('There are no chapters matching the regex' if info_dict.get('chapters')
+ else 'Cannot match chapters since chapter information is unavailable')
+ for regex in chapters or []:
+ for i, chapter in enumerate(info_dict.get('chapters') or []):
+ if re.search(regex, chapter['title']):
+ warning = None
+ yield {**chapter, 'index': i}
+ if chapters and warning:
+ ydl.to_screen(f'[info] {info_dict["id"]}: {warning}')
+
+ yield from ({'start_time': start, 'end_time': end} for start, end in ranges or [])
+
+ return inner
+
+
def parse_dfxp_time_expr(time_expr):
if not time_expr:
return
return [0, 2] + pseudo_random + [0] + data
-def encode_base_n(num, n, table=None):
- FULL_TABLE = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
- if not table:
- table = FULL_TABLE[:n]
+def _base_n_table(n, table):
+ if not table and not n:
+ raise ValueError('Either table or n must be specified')
+ table = (table or '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ')[:n]
- if n > len(table):
- raise ValueError('base %d exceeds table length %d' % (n, len(table)))
+ if n != len(table):
+ raise ValueError(f'base {n} exceeds table length {len(table)}')
+ return table
- if num == 0:
+
+def encode_base_n(num, n=None, table=None):
+ """Convert given int to a base-n string"""
+ table = _base_n_table(n, table)
+ if not num:
return table[0]
- ret = ''
+ result, base = '', len(table)
while num:
- ret = table[num % n] + ret
- num = num // n
- return ret
+ result = table[num % base] + result
+ num = num // base
+ return result
+
+
+def decode_base_n(string, n=None, table=None):
+ """Convert given base-n string to int"""
+ table = {char: index for index, char in enumerate(_base_n_table(n, table))}
+ result, base = 0, len(table)
+ for char in string:
+ result = result * base + table[char]
+ return result
+
+
+def decode_base(value, digits):
+ write_string('DeprecationWarning: yt_dlp.utils.decode_base is deprecated '
+ 'and may be removed in a future version. Use yt_dlp.decode_base_n instead')
+ return decode_base_n(value, table=digits)
def decode_packed_codes(code):
value = value.decode()
try:
- p = Popen(
+ _, stderr, returncode = Popen.run(
[exe, '-w', key, value, path] if exe == 'xattr' else [exe, '-n', key, '-v', value, path],
- stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)
+ text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)
except OSError as e:
raise XAttrMetadataError(e.errno, e.strerror)
- stderr = p.communicate_or_kill()[1].decode('utf-8', 'replace')
- if p.returncode:
- raise XAttrMetadataError(p.returncode, stderr)
+ if returncode:
+ raise XAttrMetadataError(returncode, stderr)
def random_birthday(year_field, month_field, day_field):
return path
-def format_field(obj, field=None, template='%s', ignore=(None, ''), default='', func=None):
+def format_field(obj, field=None, template='%s', ignore=NO_DEFAULT, default='', func=IDENTITY):
val = traverse_obj(obj, *variadic(field))
- if val in ignore:
+ if (not val and val != 0) if ignore is NO_DEFAULT else val in variadic(ignore):
return default
- return template % (func(val) if func else val)
+ return template % func(val)
def clean_podcast_url(url):
def get_executable_path():
- from .update import get_variant_and_executable_path
+ from .update import _get_variant_and_executable_path
- return os.path.abspath(get_variant_and_executable_path()[1])
+ return os.path.dirname(os.path.abspath(_get_variant_and_executable_path()[1]))
def load_plugins(name, suffix, namespace):
if isinstance(expected_type, type):
type_test = lambda val: val if isinstance(val, expected_type) else None
- elif expected_type is not None:
- type_test = expected_type
else:
- type_test = lambda val: val
+ type_test = expected_type or IDENTITY
for path in path_list:
depth = 0
return x if isinstance(x, collections.abc.Iterable) and not isinstance(x, allowed_types) else (x,)
-def decode_base(value, digits):
- # This will convert given base-x string to scalar (long or int)
- table = {char: index for index, char in enumerate(digits)}
- result = 0
- base = len(digits)
- for chr in value:
- result *= base
- result += table[chr]
- return result
-
-
def time_seconds(**kwargs):
t = datetime.datetime.now(datetime.timezone(datetime.timedelta(**kwargs)))
return t.timestamp()
@functools.cache
def supports_terminal_sequences(stream):
if compat_os_name == 'nt':
- if not WINDOWS_VT_MODE or get_windows_version() < (10, 0, 10586):
+ if not WINDOWS_VT_MODE:
return False
elif not os.getenv('TERM'):
return False
def windows_enable_vt_mode(): # TODO: Do this the proper way https://bugs.python.org/issue30075
- if compat_os_name != 'nt':
+ if get_windows_version() < (10, 0, 10586):
return
global WINDOWS_VT_MODE
- startupinfo = subprocess.STARTUPINFO()
- startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
try:
- subprocess.Popen('', shell=True, startupinfo=startupinfo).wait()
+ Popen.run('', shell=True)
except Exception:
return
def join_nonempty(*values, delim='-', from_dict=None):
if from_dict is not None:
- values = map(from_dict.get, values)
+ values = (traverse_obj(from_dict, variadic(v)) for v in values)
return delim.join(map(str, filter(None, values)))
return int(crg.group(1)), int_or_none(crg.group(2)), int_or_none(crg.group(3))
+def read_stdin(what):
+ eof = 'Ctrl+Z' if compat_os_name == 'nt' else 'Ctrl+D'
+ write_string(f'Reading {what} from STDIN - EOF ({eof}) to end:\n')
+ return sys.stdin
+
+
class Config:
own_args = None
parsed_args = None
self.parsed_args, self.filename = args, filename
for location in opts.config_locations or []:
+ if location == '-':
+ self.append_config(shlex.split(read_stdin('options'), comments=True), label='stdin')
+ continue
location = os.path.join(directory, expand_path(location))
if os.path.isdir(location):
location = os.path.join(location, 'yt-dlp.conf')
# FIXME: https://github.com/ytdl-org/youtube-dl/commit/dfe5fa49aed02cf36ba9f743b11b0903554b5e56
contents = optionf.read()
res = shlex.split(contents, comments=True)
+ except Exception as err:
+ raise ValueError(f'Unable to parse "{filename}": {err}')
finally:
optionf.close()
return res
return self.func(cls)
-class Namespace:
+class Namespace(types.SimpleNamespace):
"""Immutable namespace"""
- def __init__(self, **kwargs):
- self._dict = kwargs
-
- def __getattr__(self, attr):
- return self._dict[attr]
-
- def __contains__(self, item):
- return item in self._dict.values()
-
def __iter__(self):
- return iter(self._dict.items())
+ return iter(self.__dict__.values())
- def __repr__(self):
- return f'{type(self).__name__}({", ".join(f"{k}={v}" for k, v in self)})'
+ @property
+ def items_(self):
+ return self.__dict__.items()
# Deprecated