)
from ..dependencies import xattr
-__name__ = __name__.rsplit('.', 1)[0] # Pretend to be the parent module
+__name__ = __name__.rsplit('.', 1)[0] # noqa: A001: Pretend to be the parent module
# This is not clearly defined otherwise
compiled_regex_type = type(re.compile(''))
'EST': -5, 'EDT': -4, # Eastern
'CST': -6, 'CDT': -5, # Central
'MST': -7, 'MDT': -6, # Mountain
- 'PST': -8, 'PDT': -7 # Pacific
+ 'PST': -8, 'PDT': -7, # Pacific
}
# needed for sanitizing filenames in restricted mode
def find_xpath_attr(node, xpath, key, val=None):
""" Find the xpath xpath[@key=val] """
assert re.match(r'^[a-zA-Z_-]+$', key)
- expr = xpath + ('[@%s]' % key if val is None else f"[@{key}='{val}']")
+ expr = xpath + (f'[@{key}]' if val is None else f"[@{key}='{val}']")
return node.find(expr)
# On python2.6 the xml.etree.ElementTree.Element methods don't support
replaced.append(c[0])
else:
ns, tag = c
- replaced.append('{%s}%s' % (ns_map[ns], tag))
+ replaced.append(f'{{{ns_map[ns]}}}{tag}')
return '/'.join(replaced)
return default
elif fatal:
name = xpath if name is None else name
- raise ExtractorError('Could not find XML element %s' % name)
+ raise ExtractorError(f'Could not find XML element {name}')
else:
return None
return n
return default
elif fatal:
name = xpath if name is None else name
- raise ExtractorError('Could not find XML element\'s text %s' % name)
+ raise ExtractorError(f'Could not find XML element\'s text {name}')
else:
return None
return n.text
return default
elif fatal:
name = f'{xpath}[@{key}]' if name is None else name
- raise ExtractorError('Could not find XML attribute %s' % name)
+ raise ExtractorError(f'Could not find XML attribute {name}')
else:
return None
return n.attrib[key]
def get_elements_by_class(class_name, html, **kargs):
"""Return the content of all tags with the specified class in the passed HTML document as a list"""
return get_elements_by_attribute(
- 'class', r'[^\'"]*(?<=[\'"\s])%s(?=[\'"\s])[^\'"]*' % re.escape(class_name),
+ 'class', rf'[^\'"]*(?<=[\'"\s]){re.escape(class_name)}(?=[\'"\s])[^\'"]*',
html, escape_value=False)
def get_elements_html_by_class(class_name, html):
"""Return the html of all tags with the specified class in the passed HTML document as a list"""
return get_elements_html_by_attribute(
- 'class', r'[^\'"]*(?<=[\'"\s])%s(?=[\'"\s])[^\'"]*' % re.escape(class_name),
+ 'class', rf'[^\'"]*(?<=[\'"\s]){re.escape(class_name)}(?=[\'"\s])[^\'"]*',
html, escape_value=False)
yield (
unescapeHTML(re.sub(r'^(?P<q>["\'])(?P<content>.*)(?P=q)$', r'\g<content>', content, flags=re.DOTALL)),
- whole
+ whole,
)
else:
raise compat_HTMLParseError(f'matching opening tag for closing {tag} tag not found')
if not self.tagstack:
- raise self.HTMLBreakOnClosingTagException()
+ raise self.HTMLBreakOnClosingTagException
# XXX: This should be far less strict
# FIXME: An exclusive lock also locks the file from being read.
# Since windows locks are mandatory, don't lock the file on windows (for now).
# Ref: https://github.com/yt-dlp/yt-dlp/issues/3124
- raise LockingUnsupportedError()
+ raise LockingUnsupportedError
stream = locked_file(filename, open_mode, block=False).__enter__()
except OSError:
stream = open(filename, open_mode)
return url, None
url = urllib.parse.urlunsplit(parts._replace(netloc=(
parts.hostname if parts.port is None
- else '%s:%d' % (parts.hostname, parts.port))))
+ else f'{parts.hostname}:{parts.port}')))
auth_payload = base64.b64encode(
- ('%s:%s' % (parts.username, parts.password or '')).encode())
+ ('{}:{}'.format(parts.username, parts.password or '')).encode())
return url, f'Basic {auth_payload.decode()}'
numstr = mobj.group(1)
if numstr.startswith('x'):
base = 16
- numstr = '0%s' % numstr
+ numstr = f'0{numstr}'
else:
base = 10
# See https://github.com/ytdl-org/youtube-dl/issues/7518
return chr(int(numstr, base))
# Unknown entity in name, return its literal representation
- return '&%s;' % entity
+ return f'&{entity};'
def unescapeHTML(s):
class UnsupportedError(ExtractorError):
def __init__(self, url):
super().__init__(
- 'Unsupported URL: %s' % url, expected=True)
+ f'Unsupported URL: {url}', expected=True)
self.url = url
else:
self.end = dt.datetime.max.date()
if self.start > self.end:
- raise ValueError('Date range: "%s" , the start date must be before the end date' % self)
+ raise ValueError(f'Date range: "{self}" , the start date must be before the end date')
@classmethod
def day(cls, day):
with contextlib.suppress(OSError): # We may not have access to the executable
libc_ver = platform.libc_ver()
- return 'Python %s (%s %s %s) - %s (%s%s)' % (
+ return 'Python {} ({} {} {}) - {} ({}{})'.format(
platform.python_version(),
python_implementation,
platform.machine(),
@functools.cache
def get_windows_version():
- ''' Get Windows version. returns () if it's not running on Windows '''
+ """ Get Windows version. returns () if it's not running on Windows """
if compat_os_name == 'nt':
return version_tuple(platform.win32_ver()[1])
else:
ctypes.wintypes.DWORD, # dwReserved
ctypes.wintypes.DWORD, # nNumberOfBytesToLockLow
ctypes.wintypes.DWORD, # nNumberOfBytesToLockHigh
- ctypes.POINTER(OVERLAPPED) # Overlapped
+ ctypes.POINTER(OVERLAPPED), # Overlapped
]
LockFileEx.restype = ctypes.wintypes.BOOL
UnlockFileEx = kernel32.UnlockFileEx
ctypes.wintypes.DWORD, # dwReserved
ctypes.wintypes.DWORD, # nNumberOfBytesToLockLow
ctypes.wintypes.DWORD, # nNumberOfBytesToLockHigh
- ctypes.POINTER(OVERLAPPED) # Overlapped
+ ctypes.POINTER(OVERLAPPED), # Overlapped
]
UnlockFileEx.restype = ctypes.wintypes.BOOL
whole_low = 0xffffffff
assert f._lock_file_overlapped_p
handle = msvcrt.get_osfhandle(f.fileno())
if not UnlockFileEx(handle, 0, whole_low, whole_high, f._lock_file_overlapped_p):
- raise OSError('Unlocking file failed: %r' % ctypes.FormatError())
+ raise OSError(f'Unlocking file failed: {ctypes.FormatError()!r}')
else:
try:
except ImportError:
def _lock_file(f, exclusive, block):
- raise LockingUnsupportedError()
+ raise LockingUnsupportedError
def _unlock_file(f):
- raise LockingUnsupportedError()
+ raise LockingUnsupportedError
class locked_file:
def remove_quotes(s):
if s is None or len(s) < 2:
return s
- for quote in ('"', "'", ):
+ for quote in ('"', "'"):
if s[0] == quote and s[-1] == quote:
return s[1:-1]
return s
def replace_extension(filename, ext, expected_real_ext=None):
name, real_ext = os.path.splitext(filename)
- return '{}.{}'.format(
- name if not expected_real_ext or real_ext[1:] == expected_real_ext else filename,
- ext)
+ return f'{name if not expected_real_ext or real_ext[1:] == expected_real_ext else filename}.{ext}'
def check_executable(exe, args=[]):
""" Checks if the given binary is installed somewhere in PATH, and returns its name.
args can be a list of arguments for a short output (like -version) """
try:
- Popen.run([exe] + args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ Popen.run([exe, *args], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except OSError:
return False
return exe
# STDIN should be redirected too. On UNIX-like systems, ffmpeg triggers
# SIGTTOU if yt-dlp is run in the background.
# See https://github.com/ytdl-org/youtube-dl/issues/955#issuecomment-209789656
- stdout, _, ret = Popen.run([encodeArgument(exe)] + args, text=True,
+ stdout, _, ret = Popen.run([encodeArgument(exe), *args], text=True,
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if ret:
return None
"""Lazy immutable list from an iterable
Note that slices of a LazyList are lists and not LazyList"""
- class IndexError(IndexError):
+ class IndexError(IndexError): # noqa: A001
pass
def __init__(self, iterable, *, reverse=False, _cache=None):
class PagedList:
- class IndexError(IndexError):
+ class IndexError(IndexError): # noqa: A001
pass
def __len__(self):
raise TypeError('indices must be non-negative integers')
entries = self.getslice(idx, idx + 1)
if not entries:
- raise self.IndexError()
+ raise self.IndexError
return entries[0]
def __bool__(self):
except IndexError:
entry = self.MissingEntry
if not self.is_incomplete:
- raise self.IndexError()
+ raise self.IndexError
if entry is self.MissingEntry:
raise EntryNotInPlaylist(f'Entry {i + 1} cannot be found')
return entry
try:
return type(self.ydl)._handle_extraction_exceptions(lambda _, i: self._entries[i])(self.ydl, i)
except (LazyList.IndexError, PagedList.IndexError):
- raise self.IndexError()
+ raise self.IndexError
return get_entry
def __getitem__(self, idx):
def __len__(self):
return len(tuple(self[:]))
- class IndexError(IndexError):
+ class IndexError(IndexError): # noqa: A001
pass
assert 'query' not in kwargs, 'query_update and query cannot be specified at the same time'
kwargs['query'] = urllib.parse.urlencode({
**urllib.parse.parse_qs(url.query),
- **query_update
+ **query_update,
}, True)
return urllib.parse.urlunparse(url._replace(**kwargs))
def _multipart_encode_impl(data, boundary):
- content_type = 'multipart/form-data; boundary=%s' % boundary
+ content_type = f'multipart/form-data; boundary={boundary}'
out = b''
for k, v in data.items():
def multipart_encode(data, boundary=None):
- '''
+ """
Encode a dict to RFC 7578-compliant form-data
data:
a random boundary is generated.
Reference: https://tools.ietf.org/html/rfc7578
- '''
+ """
has_specified_boundary = boundary is not None
while True:
s = s.upper()
if s in US_RATINGS:
return US_RATINGS[s]
- m = re.match(r'^TV[_-]?(%s)$' % '|'.join(k[3:] for k in TV_PARENTAL_GUIDELINES), s)
+ m = re.match(r'^TV[_-]?({})$'.format('|'.join(k[3:] for k in TV_PARENTAL_GUIDELINES)), s)
if m:
return TV_PARENTAL_GUIDELINES['TV-' + m.group(1)]
return None
return v
elif v in ('undefined', 'void 0'):
return 'null'
- elif v.startswith('/*') or v.startswith('//') or v.startswith('!') or v == ',':
+ elif v.startswith(('/*', '//', '!')) or v == ',':
return ''
if v[0] in STRING_QUOTES:
def encode_data_uri(data, mime_type):
- return 'data:%s;base64,%s' % (mime_type, base64.b64encode(data).decode('ascii'))
+ return 'data:{};base64,{}'.format(mime_type, base64.b64encode(data).decode('ascii'))
def age_restricted(content_limit, age_limit):
def get_max_lens(table):
return [max(width(str(v)) for v in col) for col in zip(*table)]
- def filter_using_list(row, filterArray):
- return [col for take, col in itertools.zip_longest(filterArray, row, fillvalue=True) if take]
+ def filter_using_list(row, filter_array):
+ return [col for take, col in itertools.zip_longest(filter_array, row, fillvalue=True) if take]
max_lens = get_max_lens(data) if hide_empty else []
header_row = filter_using_list(header_row, max_lens)
data = [filter_using_list(row, max_lens) for row in data]
- table = [header_row] + data
+ table = [header_row, *data]
max_lens = get_max_lens(table)
extra_gap += 1
if delim:
- table = [header_row, [delim * (ml + extra_gap) for ml in max_lens]] + data
+ table = [header_row, [delim * (ml + extra_gap) for ml in max_lens], *data]
table[1][-1] = table[1][-1][:-extra_gap * len(delim)] # Remove extra_gap from end of delimiter
for row in table:
for pos, text in enumerate(map(str, row)):
row[pos] = text.replace('\t', ' ' * (max_lens[pos] - width(text))) + ' ' * extra_gap
else:
row[pos] = text + ' ' * (max_lens[pos] - width(text) + extra_gap)
- ret = '\n'.join(''.join(row).rstrip() for row in table)
- return ret
+ return '\n'.join(''.join(row).rstrip() for row in table)
def _match_one(filter_part, dct, incomplete):
operator_rex = re.compile(r'''(?x)
(?P<key>[a-z_]+)
- \s*(?P<negation>!\s*)?(?P<op>%s)(?P<none_inclusive>\s*\?)?\s*
+ \s*(?P<negation>!\s*)?(?P<op>{})(?P<none_inclusive>\s*\?)?\s*
(?:
(?P<quote>["\'])(?P<quotedstrval>.+?)(?P=quote)|
(?P<strval>.+?)
)
- ''' % '|'.join(map(re.escape, COMPARISON_OPERATORS.keys())))
+ '''.format('|'.join(map(re.escape, COMPARISON_OPERATORS.keys()))))
m = operator_rex.fullmatch(filter_part.strip())
if m:
m = m.groupdict()
op = unnegated_op
comparison_value = m['quotedstrval'] or m['strval'] or m['intval']
if m['quote']:
- comparison_value = comparison_value.replace(r'\%s' % m['quote'], m['quote'])
+ comparison_value = comparison_value.replace(r'\{}'.format(m['quote']), m['quote'])
actual_value = dct.get(m['key'])
numeric_comparison = None
if isinstance(actual_value, (int, float)):
if numeric_comparison is None:
numeric_comparison = parse_duration(comparison_value)
if numeric_comparison is not None and m['op'] in STRING_OPERATORS:
- raise ValueError('Operator %s only supports string values!' % m['op'])
+ raise ValueError('Operator {} only supports string values!'.format(m['op']))
if actual_value is None:
return is_incomplete(m['key']) or m['none_inclusive']
return op(actual_value, comparison_value if numeric_comparison is None else numeric_comparison)
'!': lambda v: (v is False) if isinstance(v, bool) else (v is None),
}
operator_rex = re.compile(r'''(?x)
- (?P<op>%s)\s*(?P<key>[a-z_]+)
- ''' % '|'.join(map(re.escape, UNARY_OPERATORS.keys())))
+ (?P<op>{})\s*(?P<key>[a-z_]+)
+ '''.format('|'.join(map(re.escape, UNARY_OPERATORS.keys()))))
m = operator_rex.fullmatch(filter_part.strip())
if m:
op = UNARY_OPERATORS[m.group('op')]
return True
return op(actual_value)
- raise ValueError('Invalid filter part %r' % filter_part)
+ raise ValueError(f'Invalid filter part {filter_part!r}')
def match_str(filter_str, dct, incomplete=False):
def dfxp2srt(dfxp_data):
- '''
+ """
@param dfxp_data A bytes-like object containing DFXP data
@returns A unicode object containing converted SRT data
- '''
+ """
LEGACY_NAMESPACES = (
(b'http://www.w3.org/ns/ttml', [
b'http://www.w3.org/2004/11/ttaf1',
'fontSize',
'fontStyle',
'fontWeight',
- 'textDecoration'
+ 'textDecoration',
]
_x = functools.partial(xpath_with_ns, ns_map={
if self._applied_styles and self._applied_styles[-1].get(k) == v:
continue
if k == 'color':
- font += ' color="%s"' % v
+ font += f' color="{v}"'
elif k == 'fontSize':
- font += ' size="%s"' % v
+ font += f' size="{v}"'
elif k == 'fontFamily':
- font += ' face="%s"' % v
+ font += f' face="{v}"'
elif k == 'fontWeight' and v == 'bold':
self._out += '<b>'
unclosed_elements.append('b')
if tag not in (_x('ttml:br'), 'br'):
unclosed_elements = self._unclosed_elements.pop()
for element in reversed(unclosed_elements):
- self._out += '</%s>' % element
+ self._out += f'</{element}>'
if unclosed_elements and self._applied_styles:
self._applied_styles.pop()
def ohdave_rsa_encrypt(data, exponent, modulus):
- '''
+ """
Implement OHDave's RSA algorithm. See http://www.ohdave.com/rsa/
Input:
Output: hex string of encrypted data
Limitation: supports one block encryption only
- '''
+ """
payload = int(binascii.hexlify(data[::-1]), 16)
encrypted = pow(payload, exponent, modulus)
- return '%x' % encrypted
+ return f'{encrypted:x}'
def pkcs1pad(data, length):
raise ValueError('Input data too long for PKCS#1 padding')
pseudo_random = [random.randint(0, 254) for _ in range(length - len(data) - 3)]
- return [0, 2] + pseudo_random + [0] + data
+ return [0, 2, *pseudo_random, 0, *data]
def _base_n_table(n, table):
payload_b64 = base64.b64encode(json.dumps(payload_data).encode())
h = hmac.new(key.encode(), header_b64 + b'.' + payload_b64, hashlib.sha256)
signature_b64 = base64.b64encode(h.digest())
- token = header_b64 + b'.' + payload_b64 + b'.' + signature_b64
- return token
+ return header_b64 + b'.' + payload_b64 + b'.' + signature_b64
# can be extended in future to verify the signature and parse header and return the algorithm used if it's not HS256
def jwt_decode_hs256(jwt):
header_b64, payload_b64, signature_b64 = jwt.split('.')
# add trailing ='s that may have been stripped, superfluous ='s are ignored
- payload_data = json.loads(base64.urlsafe_b64decode(f'{payload_b64}==='))
- return payload_data
+ return json.loads(base64.urlsafe_b64decode(f'{payload_b64}==='))
WINDOWS_VT_MODE = False if compat_os_name == 'nt' else None
"""
_keys = ('width', 'height')
max_dimensions = max(
- (tuple(format.get(k) or 0 for k in _keys) for format in formats),
+ (tuple(fmt.get(k) or 0 for k in _keys) for fmt in formats),
default=(0, 0))
if not max_dimensions[0]:
return thumbnails
'function': lambda it: next(filter(None, it), None)},
'ext': {'type': 'combined', 'field': ('vext', 'aext')},
'res': {'type': 'multiple', 'field': ('height', 'width'),
- 'function': lambda it: (lambda l: min(l) if l else 0)(tuple(filter(None, it)))},
+ 'function': lambda it: min(filter(None, it), default=0)},
# Actual field names
'format_id': {'type': 'alias', 'field': 'id'},
self.ydl.deprecated_feature(f'Using arbitrary fields ({field}) for format sorting is '
'deprecated and may be removed in a future version')
self.settings[field] = {}
- propObj = self.settings[field]
- if key not in propObj:
- type = propObj.get('type')
+ prop_obj = self.settings[field]
+ if key not in prop_obj:
+ type_ = prop_obj.get('type')
if key == 'field':
- default = 'preference' if type == 'extractor' else (field,) if type in ('combined', 'multiple') else field
+ default = 'preference' if type_ == 'extractor' else (field,) if type_ in ('combined', 'multiple') else field
elif key == 'convert':
- default = 'order' if type == 'ordered' else 'float_string' if field else 'ignore'
+ default = 'order' if type_ == 'ordered' else 'float_string' if field else 'ignore'
else:
- default = {'type': 'field', 'visible': True, 'order': [], 'not_in_list': (None,)}.get(key, None)
- propObj[key] = default
- return propObj[key]
+ default = {'type': 'field', 'visible': True, 'order': [], 'not_in_list': (None,)}.get(key)
+ prop_obj[key] = default
+ return prop_obj[key]
- def _resolve_field_value(self, field, value, convertNone=False):
+ def _resolve_field_value(self, field, value, convert_none=False):
if value is None:
- if not convertNone:
+ if not convert_none:
return None
else:
value = value.lower()
for item in sort_list:
match = re.match(self.regex, item)
if match is None:
- raise ExtractorError('Invalid format sort string "%s" given by extractor' % item)
+ raise ExtractorError(f'Invalid format sort string "{item}" given by extractor')
field = match.group('field')
if field is None:
continue
def print_verbose_info(self, write_debug):
if self._sort_user:
- write_debug('Sort order given by user: %s' % ', '.join(self._sort_user))
+ write_debug('Sort order given by user: {}'.format(', '.join(self._sort_user)))
if self._sort_extractor:
- write_debug('Sort order given by extractor: %s' % ', '.join(self._sort_extractor))
- write_debug('Formats sorted by: %s' % ', '.join(['%s%s%s' % (
+ write_debug('Sort order given by extractor: {}'.format(', '.join(self._sort_extractor)))
+ write_debug('Formats sorted by: {}'.format(', '.join(['{}{}{}'.format(
'+' if self._get_field_setting(field, 'reverse') else '', field,
- '%s%s(%s)' % ('~' if self._get_field_setting(field, 'closest') else ':',
- self._get_field_setting(field, 'limit_text'),
- self._get_field_setting(field, 'limit'))
+ '{}{}({})'.format('~' if self._get_field_setting(field, 'closest') else ':',
+ self._get_field_setting(field, 'limit_text'),
+ self._get_field_setting(field, 'limit'))
if self._get_field_setting(field, 'limit_text') is not None else '')
- for field in self._order if self._get_field_setting(field, 'visible')]))
+ for field in self._order if self._get_field_setting(field, 'visible')])))
- def _calculate_field_preference_from_value(self, format, field, type, value):
+ def _calculate_field_preference_from_value(self, format_, field, type_, value):
reverse = self._get_field_setting(field, 'reverse')
closest = self._get_field_setting(field, 'closest')
limit = self._get_field_setting(field, 'limit')
- if type == 'extractor':
+ if type_ == 'extractor':
maximum = self._get_field_setting(field, 'max')
if value is None or (maximum is not None and value >= maximum):
value = -1
- elif type == 'boolean':
+ elif type_ == 'boolean':
in_list = self._get_field_setting(field, 'in_list')
not_in_list = self._get_field_setting(field, 'not_in_list')
value = 0 if ((in_list is None or value in in_list) and (not_in_list is None or value not in not_in_list)) else -1
- elif type == 'ordered':
+ elif type_ == 'ordered':
value = self._resolve_field_value(field, value, True)
# try to convert to number
else (0, -value, 0) if limit is None or (reverse and value == limit) or value > limit
else (-1, value, 0))
- def _calculate_field_preference(self, format, field):
- type = self._get_field_setting(field, 'type') # extractor, boolean, ordered, field, multiple
- get_value = lambda f: format.get(self._get_field_setting(f, 'field'))
- if type == 'multiple':
- type = 'field' # Only 'field' is allowed in multiple for now
+ def _calculate_field_preference(self, format_, field):
+ type_ = self._get_field_setting(field, 'type') # extractor, boolean, ordered, field, multiple
+ get_value = lambda f: format_.get(self._get_field_setting(f, 'field'))
+ if type_ == 'multiple':
+ type_ = 'field' # Only 'field' is allowed in multiple for now
actual_fields = self._get_field_setting(field, 'field')
value = self._get_field_setting(field, 'function')(get_value(f) for f in actual_fields)
else:
value = get_value(field)
- return self._calculate_field_preference_from_value(format, field, type, value)
+ return self._calculate_field_preference_from_value(format_, field, type_, value)
def calculate_preference(self, format):
# Determine missing protocol