import xml.etree.ElementTree
import zlib
import mimetypes
+import urllib.parse
+import shlex
from .compat import (
compat_HTMLParseError,
compat_HTMLParser,
compat_HTTPError,
- compat_basestring,
compat_brotli,
compat_chr,
compat_cookiejar,
- compat_ctypes_WINFUNCTYPE,
compat_etree_fromstring,
compat_expanduser,
compat_html_entities,
compat_html_entities_html5,
compat_http_client,
- compat_integer_types,
- compat_numeric_types,
- compat_kwargs,
compat_os_name,
compat_parse_qs,
- compat_shlex_split,
compat_shlex_quote,
compat_str,
compat_struct_pack,
compat_struct_unpack,
compat_urllib_error,
- compat_urllib_parse,
compat_urllib_parse_urlencode,
compat_urllib_parse_urlparse,
- compat_urllib_parse_urlunparse,
- compat_urllib_parse_quote,
- compat_urllib_parse_quote_plus,
compat_urllib_parse_unquote_plus,
compat_urllib_request,
compat_urlparse,
compat_websockets,
- compat_xpath,
)
from .socks import (
std_headers = {
'User-Agent': random_user_agent(),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
- 'Accept-Encoding': ', '.join(SUPPORTED_ENCODINGS),
'Accept-Language': 'en-us,en;q=0.5',
'Sec-Fetch-Mode': 'navigate',
}
def write_json_file(obj, fn):
""" Encode obj as JSON and write it to fn, atomically if possible """
- fn = encodeFilename(fn)
- if sys.version_info < (3, 0) and sys.platform != 'win32':
- encoding = get_filesystem_encoding()
- # os.path.basename returns a bytes object, but NamedTemporaryFile
- # will fail if the filename contains non ascii characters unless we
- # use a unicode object
- path_basename = lambda f: os.path.basename(fn).decode(encoding)
- # the same for os.path.dirname
- path_dirname = lambda f: os.path.dirname(fn).decode(encoding)
- else:
- path_basename = os.path.basename
- path_dirname = os.path.dirname
-
- args = {
- 'suffix': '.tmp',
- 'prefix': path_basename(fn) + '.',
- 'dir': path_dirname(fn),
- 'delete': False,
- }
-
- # In Python 2.x, json.dump expects a bytestream.
- # In Python 3.x, it writes to a character stream
- if sys.version_info < (3, 0):
- args['mode'] = 'wb'
- else:
- args.update({
- 'mode': 'w',
- 'encoding': 'utf-8',
- })
-
- tf = tempfile.NamedTemporaryFile(**compat_kwargs(args))
+ tf = tempfile.NamedTemporaryFile(
+ prefix=f'{os.path.basename(fn)}.', dir=os.path.dirname(fn),
+ suffix='.tmp', delete=False, mode='w', encoding='utf-8')
try:
with tf:
raise
-if sys.version_info >= (2, 7):
- def find_xpath_attr(node, xpath, key, val=None):
- """ Find the xpath xpath[@key=val] """
- assert re.match(r'^[a-zA-Z_-]+$', key)
- expr = xpath + ('[@%s]' % key if val is None else "[@%s='%s']" % (key, val))
- return node.find(expr)
-else:
- def find_xpath_attr(node, xpath, key, val=None):
- for f in node.findall(compat_xpath(xpath)):
- if key not in f.attrib:
- continue
- if val is None or f.attrib.get(key) == val:
- return f
- return None
+def find_xpath_attr(node, xpath, key, val=None):
+ """ Find the xpath xpath[@key=val] """
+ assert re.match(r'^[a-zA-Z_-]+$', key)
+ expr = xpath + ('[@%s]' % key if val is None else "[@%s='%s']" % (key, val))
+ return node.find(expr)
# On python2.6 the xml.etree.ElementTree.Element methods don't support
# the namespace parameter
def xpath_element(node, xpath, name=None, fatal=False, default=NO_DEFAULT):
def _find_xpath(xpath):
- return node.find(compat_xpath(xpath))
+ return node.find(xpath)
if isinstance(xpath, (str, compat_str)):
n = _find_xpath(xpath)
'empty': '', 'noval': None, 'entity': '&',
'sq': '"', 'dq': '\''
}.
- NB HTMLParser is stricter in Python 2.6 & 3.2 than in later versions,
- but the cases in the unit test will work for all of 2.6, 2.7, 3.2-3.5.
"""
parser = HTMLAttributeParser()
try:
It returns the tuple (stream, definitive_file_name).
"""
- try:
- if filename == '-':
- if sys.platform == 'win32':
- import msvcrt
- msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
- return (sys.stdout.buffer if hasattr(sys.stdout, 'buffer') else sys.stdout, filename)
- stream = locked_file(filename, open_mode, block=False).open()
- return (stream, filename)
- except (IOError, OSError) as err:
- if err.errno in (errno.EACCES,):
- raise
+ if filename == '-':
+ if sys.platform == 'win32':
+ import msvcrt
+ msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
+ return (sys.stdout.buffer if hasattr(sys.stdout, 'buffer') else sys.stdout, filename)
- # In case of error, try to remove win32 forbidden chars
- alt_filename = sanitize_path(filename)
- if alt_filename == filename:
- raise
- else:
- # An exception here should be caught in the caller
- stream = locked_file(filename, open_mode, block=False).open()
- return (stream, alt_filename)
+ for attempt in range(2):
+ try:
+ try:
+ if sys.platform == 'win32':
+ # FIXME: An exclusive lock also locks the file from being read.
+ # Since windows locks are mandatory, don't lock the file on windows (for now).
+ # Ref: https://github.com/yt-dlp/yt-dlp/issues/3124
+ raise LockingUnsupportedError()
+ stream = locked_file(filename, open_mode, block=False).__enter__()
+ except LockingUnsupportedError:
+ stream = open(filename, open_mode)
+ return (stream, filename)
+ except (IOError, OSError) as err:
+ if attempt or err.errno in (errno.EACCES,):
+ raise
+ old_filename, filename = filename, sanitize_path(filename)
+ if old_filename == filename:
+ raise
def timeconvert(timestr):
return timestamp
-def sanitize_filename(s, restricted=False, is_id=False):
+def sanitize_filename(s, restricted=False, is_id=NO_DEFAULT):
"""Sanitizes a string so it could be used as part of a filename.
- If restricted is set, use a stricter subset of allowed characters.
- Set is_id if this is not an arbitrary string, but an ID that should be kept
- if possible.
+ @param restricted Use a stricter subset of allowed characters
+ @param is_id Whether this is an ID that should be kept unchanged if possible.
+ If unset, yt-dlp's new sanitization rules are in effect
"""
+ if s == '':
+ return ''
+
def replace_insane(char):
if restricted and char in ACCENT_CHARS:
return ACCENT_CHARS[char]
elif not restricted and char == '\n':
- return ' '
+ return '\0 '
elif char == '?' or ord(char) < 32 or ord(char) == 127:
return ''
elif char == '"':
return '' if restricted else '\''
elif char == ':':
- return '_-' if restricted else ' -'
+ return '\0_\0-' if restricted else '\0 \0-'
elif char in '\\/|*<>':
- return '_'
- if restricted and (char in '!&\'()[]{}$;`^,#' or char.isspace()):
- return '_'
- if restricted and ord(char) > 127:
- return '_'
+ return '\0_'
+ if restricted and (char in '!&\'()[]{}$;`^,#' or char.isspace() or ord(char) > 127):
+ return '\0_'
return char
- if s == '':
- return ''
- # Handle timestamps
- s = re.sub(r'[0-9]+(?::[0-9]+)+', lambda m: m.group(0).replace(':', '_'), s)
+ s = re.sub(r'[0-9]+(?::[0-9]+)+', lambda m: m.group(0).replace(':', '_'), s) # Handle timestamps
result = ''.join(map(replace_insane, s))
+ if is_id is NO_DEFAULT:
+ result = re.sub('(\0.)(?:(?=\\1)..)+', r'\1', result) # Remove repeated substitute chars
+ STRIP_RE = '(?:\0.|[ _-])*'
+ result = re.sub(f'^\0.{STRIP_RE}|{STRIP_RE}\0.$', '', result) # Remove substitute chars from start/end
+ result = result.replace('\0', '') or '_'
+
if not is_id:
while '__' in result:
result = result.replace('__', '_')
if sys.platform == 'win32':
force = False
drive_or_unc, _ = os.path.splitdrive(s)
- if sys.version_info < (2, 7) and not drive_or_unc:
- drive_or_unc, _ = os.path.splitunc(s)
elif force:
drive_or_unc = ''
else:
for path_part in norm_path]
if drive_or_unc:
sanitized_path.insert(0, drive_or_unc + os.path.sep)
- elif force and s[0] == os.path.sep:
+ elif force and s and s[0] == os.path.sep:
sanitized_path.insert(0, os.path.sep)
return os.path.join(*sanitized_path)
def encodeFilename(s, for_subprocess=False):
- """
- @param s The name of the file
- """
-
- assert type(s) == compat_str
-
- # Python 3 has a Unicode API
- if sys.version_info >= (3, 0):
- return s
-
- # Pass '' directly to use Unicode APIs on Windows 2000 and up
- # (Detecting Windows NT 4 is tricky because 'major >= 4' would
- # match Windows 9x series as well. Besides, NT 4 is obsolete.)
- if not for_subprocess and sys.platform == 'win32' and sys.getwindowsversion()[0] >= 5:
- return s
-
- # Jython assumes filenames are Unicode strings though reported as Python 2.x compatible
- if sys.platform.startswith('java'):
- return s
-
- return s.encode(get_subprocess_encoding(), 'ignore')
+ assert type(s) == str
+ return s
def decodeFilename(b, for_subprocess=False):
-
- if sys.version_info >= (3, 0):
- return b
-
- if not isinstance(b, bytes):
- return b
-
- return b.decode(get_subprocess_encoding(), 'ignore')
+ return b
def encodeArgument(s):
- if not isinstance(s, compat_str):
- # Legacy code that uses byte strings
- # Uncomment the following line after fixing all post processors
- # assert False, 'Internal error: %r should be of type %r, is %r' % (s, compat_str, type(s))
- s = s.decode('ascii')
- return encodeFilename(s, True)
+ # Legacy code that uses byte strings
+ # Uncomment the following line after fixing all post processors
+ # assert isinstance(s, str), 'Internal error: %r should be of type %r, is %r' % (s, compat_str, type(s))
+ return s if isinstance(s, str) else s.decode('ascii')
def decodeArgument(b):
- return decodeFilename(b, True)
+ return b
def decodeOption(optval):
def bug_reports_message(before=';'):
- msg = ('please report this issue on https://github.com/yt-dlp/yt-dlp , '
+ msg = ('please report this issue on https://github.com/yt-dlp/yt-dlp/issues?q= , '
'filling out the appropriate issue template. '
'Confirm you are on the latest version using yt-dlp -U')
def _create_http_connection(ydl_handler, http_class, is_https, *args, **kwargs):
- # Working around python 2 bug (see http://bugs.python.org/issue17849) by limiting
- # expected HTTP responses to meet HTTP/1.0 or later (see also
- # https://github.com/ytdl-org/youtube-dl/issues/6727)
- if sys.version_info < (3, 0):
- kwargs['strict'] = True
- hc = http_class(*args, **compat_kwargs(kwargs))
+ hc = http_class(*args, **kwargs)
source_address = ydl_handler._params.get('source_address')
if source_address is not None:
raise socket.error('getaddrinfo returns an empty list')
if hasattr(hc, '_create_connection'):
hc._create_connection = _create_connection
- sa = (source_address, 0)
- if hasattr(hc, 'source_address'): # Python 2.7+
- hc.source_address = sa
- else: # Python 2.6
- def _hc_connect(self, *args, **kwargs):
- sock = _create_connection(
- (self.host, self.port), self.timeout, sa)
- if is_https:
- self.sock = ssl.wrap_socket(
- sock, self.key_file, self.cert_file,
- ssl_version=ssl.PROTOCOL_TLSv1)
- else:
- self.sock = sock
- hc.connect = functools.partial(_hc_connect, hc)
+ hc.source_address = (source_address, 0)
return hc
if h.capitalize() not in req.headers:
req.add_header(h, v)
- req.headers = handle_youtubedl_headers(req.headers)
+ if 'Accept-encoding' not in req.headers:
+ req.add_header('Accept-encoding', ', '.join(SUPPORTED_ENCODINGS))
- if sys.version_info < (2, 7) and '#' in req.get_full_url():
- # Python 2.6 is brain-dead when it comes to fragments
- req._Request__original = req._Request__original.partition('#')[0]
- req._Request__r_type = req._Request__r_type.partition('#')[0]
+ req.headers = handle_youtubedl_headers(req.headers)
return req
location = resp.headers.get('Location')
if location:
# As of RFC 2616 default charset is iso-8859-1 that is respected by python 3
- if sys.version_info >= (3, 0):
- location = location.encode('iso-8859-1').decode('utf-8')
- else:
- location = location.decode('utf-8')
+ location = location.encode('iso-8859-1').decode('utf-8')
location_escaped = escape_url(location)
if location != location_escaped:
del resp.headers['Location']
- if sys.version_info < (3, 0):
- location_escaped = location_escaped.encode('utf-8')
resp.headers['Location'] = location_escaped
return resp
compat_urllib_request.HTTPCookieProcessor.__init__(self, cookiejar)
def http_response(self, request, response):
- # Python 2 will choke on next HTTP request in row if there are non-ASCII
- # characters in Set-Cookie HTTP header of last response (see
- # https://github.com/ytdl-org/youtube-dl/issues/6769).
- # In order to at least prevent crashing we will percent encode Set-Cookie
- # header before HTTPCookieProcessor starts processing it.
- # if sys.version_info < (3, 0) and response.headers:
- # for set_cookie_header in ('Set-Cookie', 'Set-Cookie2'):
- # set_cookie = response.headers.get(set_cookie_header)
- # if set_cookie:
- # set_cookie_escaped = compat_urllib_parse.quote(set_cookie, b"%/;:@&=+$,!~*'()?#[] ")
- # if set_cookie != set_cookie_escaped:
- # del response.headers[set_cookie_header]
- # response.headers[set_cookie_header] = set_cookie_escaped
return compat_urllib_request.HTTPCookieProcessor.http_response(self, request, response)
https_request = compat_urllib_request.HTTPCookieProcessor.http_request
# essentially all clients do redirect in this case, so we do
# the same.
- # On python 2 urlh.geturl() may sometimes return redirect URL
- # as byte string instead of unicode. This workaround allows
- # to force it always return unicode.
- if sys.version_info[0] < 3:
- newurl = compat_str(newurl)
-
# Be conciliant with URIs containing a space. This is mainly
# redundant with the more complete encoding done in http_error_302(),
# but it is kept for compatibility with other callers.
return None
-def _windows_write_string(s, out):
- """ Returns True if the string was written using special methods,
- False if it has yet to be written out."""
- # Adapted from http://stackoverflow.com/a/3259271/35070
-
- import ctypes.wintypes
-
- WIN_OUTPUT_IDS = {
- 1: -11,
- 2: -12,
- }
-
- try:
- fileno = out.fileno()
- except AttributeError:
- # If the output stream doesn't have a fileno, it's virtual
- return False
- except io.UnsupportedOperation:
- # Some strange Windows pseudo files?
- return False
- if fileno not in WIN_OUTPUT_IDS:
- return False
-
- GetStdHandle = compat_ctypes_WINFUNCTYPE(
- ctypes.wintypes.HANDLE, ctypes.wintypes.DWORD)(
- ('GetStdHandle', ctypes.windll.kernel32))
- h = GetStdHandle(WIN_OUTPUT_IDS[fileno])
-
- WriteConsoleW = compat_ctypes_WINFUNCTYPE(
- ctypes.wintypes.BOOL, ctypes.wintypes.HANDLE, ctypes.wintypes.LPWSTR,
- ctypes.wintypes.DWORD, ctypes.POINTER(ctypes.wintypes.DWORD),
- ctypes.wintypes.LPVOID)(('WriteConsoleW', ctypes.windll.kernel32))
- written = ctypes.wintypes.DWORD(0)
-
- GetFileType = compat_ctypes_WINFUNCTYPE(ctypes.wintypes.DWORD, ctypes.wintypes.DWORD)(('GetFileType', ctypes.windll.kernel32))
- FILE_TYPE_CHAR = 0x0002
- FILE_TYPE_REMOTE = 0x8000
- GetConsoleMode = compat_ctypes_WINFUNCTYPE(
- ctypes.wintypes.BOOL, ctypes.wintypes.HANDLE,
- ctypes.POINTER(ctypes.wintypes.DWORD))(
- ('GetConsoleMode', ctypes.windll.kernel32))
- INVALID_HANDLE_VALUE = ctypes.wintypes.DWORD(-1).value
-
- def not_a_console(handle):
- if handle == INVALID_HANDLE_VALUE or handle is None:
- return True
- return ((GetFileType(handle) & ~FILE_TYPE_REMOTE) != FILE_TYPE_CHAR
- or GetConsoleMode(handle, ctypes.byref(ctypes.wintypes.DWORD())) == 0)
-
- if not_a_console(h):
- return False
-
- def next_nonbmp_pos(s):
- try:
- return next(i for i, c in enumerate(s) if ord(c) > 0xffff)
- except StopIteration:
- return len(s)
-
- while s:
- count = min(next_nonbmp_pos(s), 1024)
-
- ret = WriteConsoleW(
- h, s, count if count else 2, ctypes.byref(written), None)
- if ret == 0:
- raise OSError('Failed to write string')
- if not count: # We just wrote a non-BMP character
- assert written.value == 2
- s = s[1:]
- else:
- assert written.value > 0
- s = s[written.value:]
- return True
-
-
def write_string(s, out=None, encoding=None):
if out is None:
out = sys.stderr
assert type(s) == compat_str
- if sys.platform == 'win32' and encoding is None and hasattr(out, 'fileno'):
- if _windows_write_string(s, out):
- return
-
- if ('b' in getattr(out, 'mode', '')
- or sys.version_info[0] < 3): # Python 2 lies about mode of sys.stderr
+ if 'b' in getattr(out, 'mode', ''):
byt = s.encode(encoding or preferredencoding(), 'ignore')
out.write(byt)
elif hasattr(out, 'buffer'):
return compat_struct_pack('%dB' % len(xs), *xs)
+class LockingUnsupportedError(IOError):
+ msg = 'File locking is not supported on this platform'
+
+ def __init__(self):
+ super().__init__(self.msg)
+
+
# Cross-platform file locking
if sys.platform == 'win32':
import ctypes.wintypes
import fcntl
def _lock_file(f, exclusive, block):
+ flags = fcntl.LOCK_EX if exclusive else fcntl.LOCK_SH
+ if not block:
+ flags |= fcntl.LOCK_NB
try:
- fcntl.flock(f,
- fcntl.LOCK_SH if not exclusive
- else fcntl.LOCK_EX if block
- else fcntl.LOCK_EX | fcntl.LOCK_NB)
+ fcntl.flock(f, flags)
except BlockingIOError:
raise
except OSError: # AOSP does not have flock()
- fcntl.lockf(f,
- fcntl.LOCK_SH if not exclusive
- else fcntl.LOCK_EX if block
- else fcntl.LOCK_EX | fcntl.LOCK_NB)
+ fcntl.lockf(f, flags)
def _unlock_file(f):
try:
fcntl.lockf(f, fcntl.LOCK_UN)
except ImportError:
- UNSUPPORTED_MSG = 'file locking is not supported on this platform'
def _lock_file(f, exclusive, block):
- raise IOError(UNSUPPORTED_MSG)
+ raise LockingUnsupportedError()
def _unlock_file(f):
- raise IOError(UNSUPPORTED_MSG)
+ raise LockingUnsupportedError()
class locked_file(object):
- _closed = False
+ locked = False
def __init__(self, filename, mode, block=True, encoding=None):
- assert mode in ['r', 'rb', 'a', 'ab', 'w', 'wb']
- self.f = io.open(filename, mode, encoding=encoding)
- self.mode = mode
- self.block = block
+ if mode not in {'r', 'rb', 'a', 'ab', 'w', 'wb'}:
+ raise NotImplementedError(mode)
+ self.mode, self.block = mode, block
+
+ writable = any(f in mode for f in 'wax+')
+ readable = any(f in mode for f in 'r+')
+ flags = functools.reduce(operator.ior, (
+ getattr(os, 'O_CLOEXEC', 0), # UNIX only
+ getattr(os, 'O_BINARY', 0), # Windows only
+ getattr(os, 'O_NOINHERIT', 0), # Windows only
+ os.O_CREAT if writable else 0, # O_TRUNC only after locking
+ os.O_APPEND if 'a' in mode else 0,
+ os.O_EXCL if 'x' in mode else 0,
+ os.O_RDONLY if not writable else os.O_RDWR if readable else os.O_WRONLY,
+ ))
+
+ self.f = os.fdopen(os.open(filename, flags, 0o666), mode, encoding=encoding)
def __enter__(self):
exclusive = 'r' not in self.mode
try:
_lock_file(self.f, exclusive, self.block)
+ self.locked = True
except IOError:
self.f.close()
raise
+ if 'w' in self.mode:
+ self.f.truncate()
return self
- def __exit__(self, etype, value, traceback):
+ def unlock(self):
+ if not self.locked:
+ return
try:
- if not self._closed:
- _unlock_file(self.f)
+ _unlock_file(self.f)
finally:
- self.f.close()
- self._closed = True
-
- def __iter__(self):
- return iter(self.f)
+ self.locked = False
- def write(self, *args):
- return self.f.write(*args)
-
- def read(self, *args):
- return self.f.read(*args)
+ def __exit__(self, *_):
+ try:
+ self.unlock()
+ finally:
+ self.f.close()
- def flush(self):
- self.f.flush()
+ open = __enter__
+ close = __exit__
- def open(self):
- return self.__enter__()
+ def __getattr__(self, attr):
+ return getattr(self.f, attr)
- def close(self, *args):
- self.__exit__(self, *args, value=False, traceback=False)
+ def __iter__(self):
+ return iter(self.f)
def get_filesystem_encoding():
return str_to_int(mobj.group(1))
-def parse_resolution(s):
+def parse_resolution(s, *, lenient=False):
if s is None:
return {}
- mobj = re.search(r'(?<![a-zA-Z0-9])(?P<w>\d+)\s*[xX×,]\s*(?P<h>\d+)(?![a-zA-Z0-9])', s)
+ if lenient:
+ mobj = re.search(r'(?P<w>\d+)\s*[xX×,]\s*(?P<h>\d+)', s)
+ else:
+ mobj = re.search(r'(?<![a-zA-Z0-9])(?P<w>\d+)\s*[xX×,]\s*(?P<h>\d+)(?![a-zA-Z0-9])', s)
if mobj:
return {
'width': int(mobj.group('w')),
def str_to_int(int_str):
""" A more relaxed version of int_or_none """
- if isinstance(int_str, compat_integer_types):
+ if isinstance(int_str, int):
return int_str
elif isinstance(int_str, compat_str):
int_str = re.sub(r'[,\.\+]', '', int_str)
def strftime_or_none(timestamp, date_format, default=None):
datetime_object = None
try:
- if isinstance(timestamp, compat_numeric_types): # unix timestamp
+ if isinstance(timestamp, (int, float)): # unix timestamp
datetime_object = datetime.datetime.utcfromtimestamp(timestamp)
elif isinstance(timestamp, compat_str): # assume YYYYMMDD
datetime_object = datetime.datetime.strptime(timestamp, '%Y%m%d')
def parse_duration(s):
- if not isinstance(s, compat_basestring):
+ if not isinstance(s, str):
return None
s = s.strip()
if not s:
m = re.match(
r'''(?ix)(?:P?
(?:
- [0-9]+\s*y(?:ears?)?\s*
+ [0-9]+\s*y(?:ears?)?,?\s*
)?
(?:
- [0-9]+\s*m(?:onths?)?\s*
+ [0-9]+\s*m(?:onths?)?,?\s*
)?
(?:
- [0-9]+\s*w(?:eeks?)?\s*
+ [0-9]+\s*w(?:eeks?)?,?\s*
)?
(?:
- (?P<days>[0-9]+)\s*d(?:ays?)?\s*
+ (?P<days>[0-9]+)\s*d(?:ays?)?,?\s*
)?
T)?
(?:
- (?P<hours>[0-9]+)\s*h(?:ours?)?\s*
+ (?P<hours>[0-9]+)\s*h(?:ours?)?,?\s*
)?
(?:
- (?P<mins>[0-9]+)\s*m(?:in(?:ute)?s?)?\s*
+ (?P<mins>[0-9]+)\s*m(?:in(?:ute)?s?)?,?\s*
)?
(?:
(?P<secs>[0-9]+)(?P<ms>\.[0-9]+)?\s*s(?:ec(?:ond)?s?)?\s*
return exe
-def _get_exe_version_output(exe, args):
+def _get_exe_version_output(exe, args, *, to_screen=None):
+ if to_screen:
+ to_screen(f'Checking exe version: {shell_quote([exe] + args)}')
try:
# STDIN should be redirected too. On UNIX-like systems, ffmpeg triggers
# SIGTTOU if yt-dlp is run in the background.
class OnDemandPagedList(PagedList):
+ """Download pages until a page with less than maximum results"""
def _getslice(self, start, end):
for pagenum in itertools.count(start // self._pagesize):
firstid = pagenum * self._pagesize
class InAdvancePagedList(PagedList):
+ """PagedList with total number of pages known in advance"""
def __init__(self, pagefunc, pagecount, pagesize):
PagedList.__init__(self, pagefunc, pagesize, True)
self._pagecount = pagecount
def escape_rfc3986(s):
"""Escape non-ASCII characters as suggested by RFC 3986"""
- if sys.version_info < (3, 0) and isinstance(s, compat_str):
- s = s.encode('utf-8')
- return compat_urllib_parse.quote(s, b"%/;:@&=+$,!~*'()?#[]")
+ return urllib.parse.quote(s, b"%/;:@&=+$,!~*'()?#[]")
def escape_url(url):
def dict_get(d, key_or_keys, default=None, skip_false_values=True):
- if isinstance(key_or_keys, (list, tuple)):
- for key in key_or_keys:
- if key not in d or d[key] is None or skip_false_values and not d[key]:
- continue
- return d[key]
- return default
- return d.get(key_or_keys, default)
+ for val in map(d.get, variadic(key_or_keys)):
+ if val is not None and (val or not skip_false_values):
+ return val
+ return default
-def try_get(src, getter, expected_type=None):
- for get in variadic(getter):
+def try_call(*funcs, expected_type=None, args=[], kwargs={}):
+ for f in funcs:
try:
- v = get(src)
- except (AttributeError, KeyError, TypeError, IndexError):
+ val = f(*args, **kwargs)
+ except (AttributeError, KeyError, TypeError, IndexError, ZeroDivisionError):
pass
else:
- if expected_type is None or isinstance(v, expected_type):
- return v
+ if expected_type is None or isinstance(val, expected_type):
+ return val
+
+
+def try_get(src, getter, expected_type=None):
+ return try_call(*variadic(getter), args=(src,), expected_type=expected_type)
+
+
+def filter_dict(dct, cndn=lambda _, v: v is not None):
+ return {k: v for k, v in dct.items() if cndn(k, v)}
def merge_dicts(*dicts):
merged = {}
for a_dict in dicts:
for k, v in a_dict.items():
- if v is None:
- continue
- if (k not in merged
- or (isinstance(v, compat_str) and v
- and isinstance(merged[k], compat_str)
- and not merged[k])):
+ if (v is not None and k not in merged
+ or isinstance(v, str) and merged[k] == ''):
merged[k] = v
return merged
def parse_age_limit(s):
if type(s) == int:
return s if 0 <= s <= 21 else None
- if not isinstance(s, compat_basestring):
+ if not isinstance(s, str):
return None
m = re.match(r'^(?P<age>\d{1,2})\+?$', s)
if m:
def error_to_compat_str(err):
- err_str = str(err)
- # On python 2 error byte string must be decoded with proper
- # encoding rather than ascii
- if sys.version_info[0] < 3:
- err_str = err_str.decode(preferredencoding())
- return err_str
+ return str(err)
+
+
+def error_to_str(err):
+ return f'{type(err).__name__}: {err}'
def mimetype2ext(mt):
'=': operator.eq,
}
+ if isinstance(incomplete, bool):
+ is_incomplete = lambda _: incomplete
+ else:
+ is_incomplete = lambda k: k in incomplete
+
operator_rex = re.compile(r'''(?x)\s*
(?P<key>[a-z_]+)
\s*(?P<negation>!\s*)?(?P<op>%s)(?P<none_inclusive>\s*\?)?\s*
comparison_value = comparison_value.replace(r'\%s' % m['quote'], m['quote'])
actual_value = dct.get(m['key'])
numeric_comparison = None
- if isinstance(actual_value, compat_numeric_types):
+ if isinstance(actual_value, (int, float)):
# If the original field is a string and matching comparisonvalue is
# a number we should respect the origin of the original field
# and process comparison value as a string (see
if numeric_comparison is not None and m['op'] in STRING_OPERATORS:
raise ValueError('Operator %s only supports string values!' % m['op'])
if actual_value is None:
- return incomplete or m['none_inclusive']
+ return is_incomplete(m['key']) or m['none_inclusive']
return op(actual_value, comparison_value if numeric_comparison is None else numeric_comparison)
UNARY_OPERATORS = {
if m:
op = UNARY_OPERATORS[m.group('op')]
actual_value = dct.get(m.group('key'))
- if incomplete and actual_value is None:
+ if is_incomplete(m.group('key')) and actual_value is None:
return True
return op(actual_value)
def match_str(filter_str, dct, incomplete=False):
- """ Filter a dictionary with a simple string syntax. Returns True (=passes filter) or false
- When incomplete, all conditions passes on missing fields
+ """ Filter a dictionary with a simple string syntax.
+ @returns Whether the filter passes
+ @param incomplete Set of keys that is expected to be missing from dct.
+ Can be True/False to indicate all/none of the keys may be missing.
+ All conditions on incomplete keys pass if the key is missing
"""
return all(
_match_one(filter_part.replace(r'\&', '&'), dct, incomplete)
for filter_part in re.split(r'(?<!\\)&', filter_str))
-def match_filter_func(filter_str):
- if filter_str is None:
+def match_filter_func(filters):
+ if not filters:
return None
+ filters = variadic(filters)
def _match_func(info_dict, *args, **kwargs):
- if match_str(filter_str, info_dict, *args, **kwargs):
+ if any(match_str(f, info_dict, *args, **kwargs) for f in filters):
return None
else:
- video_title = info_dict.get('title', info_dict.get('id', 'video'))
- return '%s does not pass filter %s, skipping ..' % (video_title, filter_str)
+ video_title = info_dict.get('title') or info_dict.get('id') or 'video'
+ filter_str = ') | ('.join(map(str.strip, filters))
+ return f'{video_title} does not pass filter ({filter_str}), skipping ..'
return _match_func
net_location = ''
if iri_parts.username:
- net_location += compat_urllib_parse_quote(iri_parts.username, safe=r"!$%&'()*+,~")
+ net_location += urllib.parse.quote(iri_parts.username, safe=r"!$%&'()*+,~")
if iri_parts.password is not None:
- net_location += ':' + compat_urllib_parse_quote(iri_parts.password, safe=r"!$%&'()*+,~")
+ net_location += ':' + urllib.parse.quote(iri_parts.password, safe=r"!$%&'()*+,~")
net_location += '@'
net_location += iri_parts.hostname.encode('idna').decode('utf-8') # Punycode for Unicode hostnames.
if iri_parts.port is not None and iri_parts.port != 80:
net_location += ':' + str(iri_parts.port)
- return compat_urllib_parse_urlunparse(
+ return urllib.parse.urlunparse(
(iri_parts.scheme,
net_location,
- compat_urllib_parse_quote_plus(iri_parts.path, safe=r"!$%&'()*+,/:;=@|~"),
+ urllib.parse.quote_plus(iri_parts.path, safe=r"!$%&'()*+,/:;=@|~"),
# Unsure about the `safe` argument, since this is a legacy way of handling parameters.
- compat_urllib_parse_quote_plus(iri_parts.params, safe=r"!$%&'()*+,/:;=@|~"),
+ urllib.parse.quote_plus(iri_parts.params, safe=r"!$%&'()*+,/:;=@|~"),
# Not totally sure about the `safe` argument, since the source does not explicitly mention the query URI component.
- compat_urllib_parse_quote_plus(iri_parts.query, safe=r"!$%&'()*+,/:;=?@{|}~"),
+ urllib.parse.quote_plus(iri_parts.query, safe=r"!$%&'()*+,/:;=?@{|}~"),
- compat_urllib_parse_quote_plus(iri_parts.fragment, safe=r"!#$%&'()*+,/:;=?@{|}~")))
+ urllib.parse.quote_plus(iri_parts.fragment, safe=r"!#$%&'()*+,/:;=?@{|}~")))
# Source for `safe` arguments: https://url.spec.whatwg.org/#percent-encoded-bytes.
from zipimport import zipimporter
if hasattr(sys, 'frozen'): # Running from PyInstaller
path = os.path.dirname(sys.executable)
- elif isinstance(globals().get('__loader__'), zipimporter): # Running from ZIP
+ elif isinstance(__loader__, zipimporter): # Running from ZIP
path = os.path.join(os.path.dirname(__file__), '../..')
else:
path = os.path.join(os.path.dirname(__file__), '..')
@param path_list A list of paths which are checked one by one.
Each path is a list of keys where each key is a string,
a function, a tuple of strings/None or "...".
- When a fuction is given, it takes the key as argument and
- returns whether the key matches or not. When a tuple is given,
+ When a fuction is given, it takes the key and value as arguments
+ and returns whether the key matches or not. When a tuple is given,
all the keys given in the tuple are traversed, and
"..." traverses all the keys in the object
"None" returns the object without traversal
obj = str(obj)
_current_depth += 1
depth = max(depth, _current_depth)
- return [_traverse_obj(v, path[i + 1:], _current_depth) for k, v in obj if key(k)]
+ return [_traverse_obj(v, path[i + 1:], _current_depth) for k, v in obj if try_call(key, args=(k, v))]
elif isinstance(obj, dict) and not (is_user_input and key == ':'):
obj = (obj.get(key) if casesense or (key in obj)
else next((v for k, v in obj.items() if _lower(k) == key), None))
try:
# FIXME: https://github.com/ytdl-org/youtube-dl/commit/dfe5fa49aed02cf36ba9f743b11b0903554b5e56
contents = optionf.read()
- if sys.version_info < (3,):
- contents = contents.decode(preferredencoding())
- res = compat_shlex_split(contents, comments=True)
+ res = shlex.split(contents, comments=True)
finally:
optionf.close()
return res
class WebSocketsWrapper():
"""Wraps websockets module to use in non-async scopes"""
- def __init__(self, url, headers=None):
+ def __init__(self, url, headers=None, connect=True):
self.loop = asyncio.events.new_event_loop()
self.conn = compat_websockets.connect(
url, extra_headers=headers, ping_interval=None,
close_timeout=float('inf'), loop=self.loop, ping_timeout=float('inf'))
+ if connect:
+ self.__enter__()
atexit.register(self.__exit__, None, None, None)
def __enter__(self):
- self.pool = self.run_with_loop(self.conn.__aenter__(), self.loop)
+ if not self.pool:
+ self.pool = self.run_with_loop(self.conn.__aenter__(), self.loop)
return self
def send(self, *args):
def merge_headers(*dicts):
"""Merge dicts of http headers case insensitively, prioritizing the latter ones"""
return {k.title(): v for k, v in itertools.chain.from_iterable(map(dict.items, dicts))}
+
+
+class classproperty:
+ def __init__(self, f):
+ self.f = f
+
+ def __get__(self, _, cls):
+ return self.f(cls)