#!/usr/bin/env python3
-# coding: utf-8
-
-from __future__ import unicode_literals
-
-import asyncio
import atexit
import base64
import binascii
import contextlib
import ctypes
import datetime
-import email.utils
import email.header
+import email.utils
import errno
-import functools
import gzip
import hashlib
import hmac
import json
import locale
import math
+import mimetypes
import operator
import os
import platform
import random
import re
+import shlex
import socket
import ssl
import subprocess
import tempfile
import time
import traceback
+import urllib.parse
import xml.etree.ElementTree
import zlib
-import mimetypes
-import urllib.parse
-import shlex
+from .compat import asyncio, functools # isort: split
from .compat import (
- compat_HTMLParseError,
- compat_HTMLParser,
- compat_HTTPError,
- compat_brotli,
compat_chr,
compat_cookiejar,
compat_etree_fromstring,
compat_expanduser,
compat_html_entities,
compat_html_entities_html5,
+ compat_HTMLParseError,
+ compat_HTMLParser,
compat_http_client,
+ compat_HTTPError,
compat_os_name,
compat_parse_qs,
compat_shlex_quote,
compat_struct_pack,
compat_struct_unpack,
compat_urllib_error,
+ compat_urllib_parse_unquote_plus,
compat_urllib_parse_urlencode,
compat_urllib_parse_urlparse,
- compat_urllib_parse_unquote_plus,
compat_urllib_request,
compat_urlparse,
- compat_websockets,
-)
-
-from .socks import (
- ProxyType,
- sockssocket,
)
-
-try:
- import certifi
- has_certifi = True
-except ImportError:
- has_certifi = False
+from .dependencies import brotli, certifi, websockets
+from .socks import ProxyType, sockssocket
def register_socks_protocols():
SUPPORTED_ENCODINGS = [
'gzip', 'deflate'
]
-if compat_brotli:
+if brotli:
SUPPORTED_ENCODINGS.append('br')
std_headers = {
PACKED_CODES_RE = r"}\('(.+)',(\d+),(\d+),'([^']+)'\.split\('\|'\)"
JSON_LD_RE = r'(?is)<script[^>]+type=(["\']?)application/ld\+json\1[^>]*>(?P<json_ld>.+?)</script>'
+NUMBER_RE = r'\d+(?:\.\d+)?'
+
+@functools.cache
def preferredencoding():
"""Get preferred encoding.
if sys.platform == 'win32':
# Need to remove existing file on Windows, else os.rename raises
# WindowsError or FileExistsError.
- try:
+ with contextlib.suppress(OSError):
os.unlink(fn)
- except OSError:
- pass
- try:
+ with contextlib.suppress(OSError):
mask = os.umask(0)
os.umask(mask)
os.chmod(tf.name, 0o666 & ~mask)
- except OSError:
- pass
os.rename(tf.name, fn)
except Exception:
- try:
+ with contextlib.suppress(OSError):
os.remove(tf.name)
- except OSError:
- pass
raise
def find_xpath_attr(node, xpath, key, val=None):
""" Find the xpath xpath[@key=val] """
assert re.match(r'^[a-zA-Z_-]+$', key)
- expr = xpath + ('[@%s]' % key if val is None else "[@%s='%s']" % (key, val))
+ expr = xpath + ('[@%s]' % key if val is None else f"[@{key}='{val}']")
return node.find(expr)
# On python2.6 the xml.etree.ElementTree.Element methods don't support
if default is not NO_DEFAULT:
return default
elif fatal:
- name = '%s[@%s]' % (xpath, key) if name is None else name
+ name = f'{xpath}[@{key}]' if name is None else name
raise ExtractorError('Could not find XML attribute %s' % name)
else:
return None
return n.attrib[key]
-def get_element_by_id(id, html):
+def get_element_by_id(id, html, **kwargs):
"""Return the content of the tag with the specified ID in the passed HTML document"""
- return get_element_by_attribute('id', id, html)
+ return get_element_by_attribute('id', id, html, **kwargs)
-def get_element_html_by_id(id, html):
+def get_element_html_by_id(id, html, **kwargs):
"""Return the html of the tag with the specified ID in the passed HTML document"""
- return get_element_html_by_attribute('id', id, html)
+ return get_element_html_by_attribute('id', id, html, **kwargs)
def get_element_by_class(class_name, html):
return retval[0] if retval else None
-def get_element_by_attribute(attribute, value, html, escape_value=True):
- retval = get_elements_by_attribute(attribute, value, html, escape_value)
+def get_element_by_attribute(attribute, value, html, **kwargs):
+ retval = get_elements_by_attribute(attribute, value, html, **kwargs)
return retval[0] if retval else None
-def get_element_html_by_attribute(attribute, value, html, escape_value=True):
- retval = get_elements_html_by_attribute(attribute, value, html, escape_value)
+def get_element_html_by_attribute(attribute, value, html, **kargs):
+ retval = get_elements_html_by_attribute(attribute, value, html, **kargs)
return retval[0] if retval else None
-def get_elements_by_class(class_name, html):
+def get_elements_by_class(class_name, html, **kargs):
"""Return the content of all tags with the specified class in the passed HTML document as a list"""
return get_elements_by_attribute(
'class', r'[^\'"]*\b%s\b[^\'"]*' % re.escape(class_name),
attribute in the passed HTML document
"""
- value_quote_optional = '' if re.match(r'''[\s"'`=<>]''', value) else '?'
+ quote = '' if re.match(r'''[\s"'`=<>]''', value) else '?'
value = re.escape(value) if escape_value else value
- partial_element_re = r'''(?x)
+ partial_element_re = rf'''(?x)
<(?P<tag>[a-zA-Z0-9:._-]+)
(?:\s(?:[^>"']|"[^"]*"|'[^']*')*)?
- \s%(attribute)s\s*=\s*(?P<_q>['"]%(vqo)s)(?-x:%(value)s)(?P=_q)
- ''' % {'attribute': re.escape(attribute), 'value': value, 'vqo': value_quote_optional}
+ \s{re.escape(attribute)}\s*=\s*(?P<_q>['"]{quote})(?-x:{value})(?P=_q)
+ '''
for m in re.finditer(partial_element_re, html):
content, whole = get_element_text_and_html_by_tag(m.group('tag'), html[m.start():])
}.
"""
parser = HTMLAttributeParser()
- try:
+ with contextlib.suppress(compat_HTMLParseError):
parser.feed(html_element)
parser.close()
- # Older Python may throw HTMLParseError in case of malformed HTML
- except compat_HTMLParseError:
- pass
return parser.attrs
except LockingUnsupportedError:
stream = open(filename, open_mode)
return (stream, filename)
- except (IOError, OSError) as err:
+ except OSError as err:
if attempt or err.errno in (errno.EACCES,):
raise
old_filename, filename = filename, sanitize_path(filename)
def sanitize_url(url):
# Prepend protocol-less URLs with `http:` scheme in order to mitigate
# the number of unwanted failures due to missing protocol
- if url.startswith('//'):
+ if url is None:
+ return
+ elif url.startswith('//'):
return 'http:%s' % url
# Fix some common typos seen so far
COMMON_TYPOS = (
parts.hostname if parts.port is None
else '%s:%d' % (parts.hostname, parts.port))))
auth_payload = base64.b64encode(
- ('%s:%s' % (parts.username, parts.password or '')).encode('utf-8'))
- return url, 'Basic ' + auth_payload.decode('utf-8')
+ ('%s:%s' % (parts.username, parts.password or '')).encode())
+ return url, f'Basic {auth_payload.decode()}'
def sanitized_Request(url, *args, **kwargs):
else:
base = 10
# See https://github.com/ytdl-org/youtube-dl/issues/7518
- try:
+ with contextlib.suppress(ValueError):
return compat_chr(int(numstr, base))
- except ValueError:
- pass
# Unknown entity in name, return its literal representation
return '&%s;' % entity
def unescapeHTML(s):
if s is None:
return None
- assert type(s) == compat_str
+ assert isinstance(s, str)
return re.sub(
r'&([^&;]+;)', lambda m: _htmlentity_transform(m.group(1)), s)
_startupinfo = None
def __init__(self, *args, **kwargs):
- super(Popen, self).__init__(*args, **kwargs, startupinfo=self._startupinfo)
+ super().__init__(*args, **kwargs, startupinfo=self._startupinfo)
def communicate_or_kill(self, *args, **kwargs):
return process_communicate_or_kill(self, *args, **kwargs)
def encodeFilename(s, for_subprocess=False):
- assert type(s) == str
+ assert isinstance(s, str)
return s
except PermissionError:
return
for cert in certs:
- try:
+ with contextlib.suppress(ssl.SSLError):
ssl_context.load_verify_locations(cadata=cert)
- except ssl.SSLError:
- pass
def make_HTTPS_handler(params, **kwargs):
context.check_hostname = opts_check_certificate
if params.get('legacyserverconnect'):
context.options |= 4 # SSL_OP_LEGACY_SERVER_CONNECT
+ # Allow use of weaker ciphers in Python 3.10+. See https://bugs.python.org/issue43998
+ context.set_ciphers('DEFAULT')
context.verify_mode = ssl.CERT_REQUIRED if opts_check_certificate else ssl.CERT_NONE
if opts_check_certificate:
if has_certifi and 'no-certifi' not in params.get('compat_opts', []):
except ssl.SSLError:
# enum_certificates is not present in mingw python. See https://github.com/yt-dlp/yt-dlp/issues/1151
if sys.platform == 'win32' and hasattr(ssl, 'enum_certificates'):
- # Create a new context to discard any certificates that were already loaded
- context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
- context.check_hostname, context.verify_mode = True, ssl.CERT_REQUIRED
for storename in ('CA', 'ROOT'):
_ssl_load_windows_store_certs(context, storename)
context.set_default_verify_paths()
+ client_certfile = params.get('client_certificate')
+ if client_certfile:
+ try:
+ context.load_cert_chain(
+ client_certfile, keyfile=params.get('client_certificate_key'),
+ password=params.get('client_certificate_password'))
+ except ssl.SSLError:
+ raise YoutubeDLError('Unable to load client certificate')
return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
self.ie = ie
self.exc_info = sys.exc_info() # preserve original exception
- super(ExtractorError, self).__init__(''.join((
+ super().__init__(''.join((
format_field(ie, template='[%s] '),
format_field(video_id, template='%s: '),
msg,
class UnsupportedError(ExtractorError):
def __init__(self, url):
- super(UnsupportedError, self).__init__(
+ super().__init__(
'Unsupported URL: %s' % url, expected=True)
self.url = url
def __init__(self, msg, countries=None, **kwargs):
kwargs['expected'] = True
- super(GeoRestrictedError, self).__init__(msg, **kwargs)
+ super().__init__(msg, **kwargs)
self.countries = countries
def __init__(self, msg, exc_info=None):
""" exc_info, if given, is the original exception that caused the trouble (as returned by sys.exc_info()). """
- super(DownloadError, self).__init__(msg)
+ super().__init__(msg)
self.exc_info = exc_info
"""
def __init__(self, downloaded, expected):
- super(ContentTooShortError, self).__init__(
- 'Downloaded {0} bytes, expected {1} bytes'.format(downloaded, expected)
- )
+ super().__init__(f'Downloaded {downloaded} bytes, expected {expected} bytes')
# Both in bytes
self.downloaded = downloaded
self.expected = expected
class XAttrMetadataError(YoutubeDLError):
def __init__(self, code=None, msg='Unknown error'):
- super(XAttrMetadataError, self).__init__(msg)
+ super().__init__(msg)
self.code = code
self.msg = msg
ip_addrs = [addr for addr in addrs if addr[0] == af]
if addrs and not ip_addrs:
ip_version = 'v4' if af == socket.AF_INET else 'v6'
- raise socket.error(
+ raise OSError(
"No remote IP%s addresses available for connect, can't use '%s' as source address"
% (ip_version, source_address[0]))
for res in ip_addrs:
sock.connect(sa)
err = None # Explicitly break reference cycle
return sock
- except socket.error as _:
+ except OSError as _:
err = _
if sock is not None:
sock.close()
if err is not None:
raise err
else:
- raise socket.error('getaddrinfo returns an empty list')
+ raise OSError('getaddrinfo returns an empty list')
if hasattr(hc, '_create_connection'):
hc._create_connection = _create_connection
hc.source_address = (source_address, 0)
filtered_headers = headers
if 'Youtubedl-no-compression' in filtered_headers:
- filtered_headers = dict((k, v) for k, v in filtered_headers.items() if k.lower() != 'accept-encoding')
+ filtered_headers = {k: v for k, v in filtered_headers.items() if k.lower() != 'accept-encoding'}
del filtered_headers['Youtubedl-no-compression']
return filtered_headers
def brotli(data):
if not data:
return data
- return compat_brotli.decompress(data)
+ return brotli.decompress(data)
def http_request(self, req):
# According to RFC 3986, URLs can not contain non-ASCII characters, however this is not
gz = gzip.GzipFile(fileobj=io.BytesIO(content), mode='rb')
try:
uncompressed = io.BytesIO(gz.read())
- except IOError as original_ioerror:
+ except OSError as original_ioerror:
# There may be junk add the end of the file
# See http://stackoverflow.com/q/4928560/35070 for details
for i in range(1, 1024):
try:
gz = gzip.GzipFile(fileobj=io.BytesIO(content[:-i]), mode='rb')
uncompressed = io.BytesIO(gz.read())
- except IOError:
+ except OSError:
continue
break
else:
location = resp.headers.get('Location')
if location:
# As of RFC 2616 default charset is iso-8859-1 that is respected by python 3
- location = location.encode('iso-8859-1').decode('utf-8')
+ location = location.encode('iso-8859-1').decode()
location_escaped = escape_url(location)
if location != location_escaped:
del resp.headers['Location']
def connect(self):
self.sock = sockssocket()
self.sock.setproxy(*proxy_args)
- if type(self.timeout) in (int, float):
+ if isinstance(self.timeout, (int, float)):
self.sock.settimeout(self.timeout)
self.sock.connect((self.host, self.port))
conn_class = make_socks_conn_class(conn_class, socks_proxy)
del req.headers['Ytdl-socks-proxy']
- return self.do_open(functools.partial(
- _create_http_connection, self, conn_class, True),
- req, **kwargs)
+ try:
+ return self.do_open(
+ functools.partial(_create_http_connection, self, conn_class, True), req, **kwargs)
+ except urllib.error.URLError as e:
+ if (isinstance(e.reason, ssl.SSLError)
+ and getattr(e.reason, 'reason', None) == 'SSLV3_ALERT_HANDSHAKE_FAILURE'):
+ raise YoutubeDLError('SSLV3_ALERT_HANDSHAKE_FAILURE: Try using --legacy-server-connect')
+ raise
class YoutubeDLCookieJar(compat_cookiejar.MozillaCookieJar):
'CookieFileEntry',
('domain_name', 'include_subdomains', 'path', 'https_only', 'expires_at', 'name', 'value'))
- def save(self, filename=None, ignore_discard=False, ignore_expires=False):
+ def __init__(self, filename=None, *args, **kwargs):
+ super().__init__(None, *args, **kwargs)
+ if self.is_path(filename):
+ filename = os.fspath(filename)
+ self.filename = filename
+
+ @staticmethod
+ def _true_or_false(cndn):
+ return 'TRUE' if cndn else 'FALSE'
+
+ @staticmethod
+ def is_path(file):
+ return isinstance(file, (str, bytes, os.PathLike))
+
+ @contextlib.contextmanager
+ def open(self, file, *, write=False):
+ if self.is_path(file):
+ with open(file, 'w' if write else 'r', encoding='utf-8') as f:
+ yield f
+ else:
+ if write:
+ file.truncate(0)
+ yield file
+
+ def _really_save(self, f, ignore_discard=False, ignore_expires=False):
+ now = time.time()
+ for cookie in self:
+ if (not ignore_discard and cookie.discard
+ or not ignore_expires and cookie.is_expired(now)):
+ continue
+ name, value = cookie.name, cookie.value
+ if value is None:
+ # cookies.txt regards 'Set-Cookie: foo' as a cookie
+ # with no name, whereas http.cookiejar regards it as a
+ # cookie with no value.
+ name, value = '', name
+ f.write('%s\n' % '\t'.join((
+ cookie.domain,
+ self._true_or_false(cookie.domain.startswith('.')),
+ cookie.path,
+ self._true_or_false(cookie.secure),
+ str_or_none(cookie.expires, default=''),
+ name, value
+ )))
+
+ def save(self, filename=None, *args, **kwargs):
"""
Save cookies to a file.
+ Code is taken from CPython 3.6
+ https://github.com/python/cpython/blob/8d999cbf4adea053be6dbb612b9844635c4dfb8e/Lib/http/cookiejar.py#L2091-L2117 """
- Most of the code is taken from CPython 3.8 and slightly adapted
- to support cookie files with UTF-8 in both python 2 and 3.
- """
if filename is None:
if self.filename is not None:
filename = self.filename
else:
raise ValueError(compat_cookiejar.MISSING_FILENAME_TEXT)
- # Store session cookies with `expires` set to 0 instead of an empty
- # string
+ # Store session cookies with `expires` set to 0 instead of an empty string
for cookie in self:
if cookie.expires is None:
cookie.expires = 0
- with io.open(filename, 'w', encoding='utf-8') as f:
+ with self.open(filename, write=True) as f:
f.write(self._HEADER)
- now = time.time()
- for cookie in self:
- if not ignore_discard and cookie.discard:
- continue
- if not ignore_expires and cookie.is_expired(now):
- continue
- if cookie.secure:
- secure = 'TRUE'
- else:
- secure = 'FALSE'
- if cookie.domain.startswith('.'):
- initial_dot = 'TRUE'
- else:
- initial_dot = 'FALSE'
- if cookie.expires is not None:
- expires = compat_str(cookie.expires)
- else:
- expires = ''
- if cookie.value is None:
- # cookies.txt regards 'Set-Cookie: foo' as a cookie
- # with no name, whereas http.cookiejar regards it as a
- # cookie with no value.
- name = ''
- value = cookie.name
- else:
- name = cookie.name
- value = cookie.value
- f.write(
- '\t'.join([cookie.domain, initial_dot, cookie.path,
- secure, expires, name, value]) + '\n')
+ self._really_save(f, *args, **kwargs)
def load(self, filename=None, ignore_discard=False, ignore_expires=False):
"""Load cookies from a file."""
return line
cf = io.StringIO()
- with io.open(filename, encoding='utf-8') as f:
+ with self.open(filename) as f:
for line in f:
try:
cf.write(prepare_line(line))
except compat_cookiejar.LoadError as e:
- write_string(
- 'WARNING: skipping cookie file entry due to %s: %r\n'
- % (e, line), sys.stderr)
+ if f'{line.strip()} '[0] in '[{"':
+ raise compat_cookiejar.LoadError(
+ 'Cookies file must be Netscape formatted, not JSON. See '
+ 'https://github.com/ytdl-org/youtube-dl#how-do-i-pass-cookies-to-youtube-dl')
+ write_string(f'WARNING: skipping cookie file entry due to {e}: {line!r}\n')
continue
cf.seek(0)
self._really_load(cf, filename, ignore_discard, ignore_expires)
CONTENT_HEADERS = ("content-length", "content-type")
# NB: don't use dict comprehension for python 2.6 compatibility
- newheaders = dict((k, v) for k, v in req.headers.items()
- if k.lower() not in CONTENT_HEADERS)
+ newheaders = {k: v for k, v in req.headers.items() if k.lower() not in CONTENT_HEADERS}
+
+ # A 303 must either use GET or HEAD for subsequent request
+ # https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.4
+ if code == 303 and m != 'HEAD':
+ m = 'GET'
+ # 301 and 302 redirects are commonly turned into a GET from a POST
+ # for subsequent requests by browsers, so we'll do the same.
+ # https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.2
+ # https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.3
+ if code in (301, 302) and m == 'POST':
+ m = 'GET'
+
return compat_urllib_request.Request(
newurl, headers=newheaders, origin_req_host=req.origin_req_host,
- unverifiable=True)
+ unverifiable=True, method=m)
def extract_timezone(date_str):
if timezone is None:
timezone, date_str = extract_timezone(date_str)
- try:
- date_format = '%Y-%m-%d{0}%H:%M:%S'.format(delimiter)
+ with contextlib.suppress(ValueError):
+ date_format = f'%Y-%m-%d{delimiter}%H:%M:%S'
dt = datetime.datetime.strptime(date_str, date_format) - timezone
return calendar.timegm(dt.timetuple())
- except ValueError:
- pass
def date_formats(day_first=True):
_, date_str = extract_timezone(date_str)
for expression in date_formats(day_first):
- try:
+ with contextlib.suppress(ValueError):
upload_date = datetime.datetime.strptime(date_str, expression).strftime('%Y%m%d')
- except ValueError:
- pass
if upload_date is None:
timetuple = email.utils.parsedate_tz(date_str)
if timetuple:
- try:
+ with contextlib.suppress(ValueError):
upload_date = datetime.datetime(*timetuple[:6]).strftime('%Y%m%d')
- except ValueError:
- pass
if upload_date is not None:
return compat_str(upload_date)
date_str = m.group(1)
for expression in date_formats(day_first):
- try:
+ with contextlib.suppress(ValueError):
dt = datetime.datetime.strptime(date_str, expression) - timezone + datetime.timedelta(hours=pm_delta)
return calendar.timegm(dt.timetuple())
- except ValueError:
- pass
timetuple = email.utils.parsedate_tz(date_str)
if timetuple:
return calendar.timegm(timetuple) + pm_delta * 3600
def datetime_from_str(date_str, precision='auto', format='%Y%m%d'):
- """
- Return a datetime object from a string in the format YYYYMMDD or
- (now|today|yesterday|date)[+-][0-9](microsecond|second|minute|hour|day|week|month|year)(s)?
-
- format: string date format used to return datetime object from
- precision: round the time portion of a datetime object.
- auto|microsecond|second|minute|hour|day.
- auto: round to the unit provided in date_str (if applicable).
+ R"""
+ Return a datetime object from a string.
+ Supported format:
+ (now|today|yesterday|DATE)([+-]\d+(microsecond|second|minute|hour|day|week|month|year)s?)?
+
+ @param format strftime format of DATE
+ @param precision Round the datetime object: auto|microsecond|second|minute|hour|day
+ auto: round to the unit provided in date_str (if applicable).
"""
auto_precision = False
if precision == 'auto':
if date_str == 'yesterday':
return today - datetime.timedelta(days=1)
match = re.match(
- r'(?P<start>.+)(?P<sign>[+-])(?P<time>\d+)(?P<unit>microsecond|second|minute|hour|day|week|month|year)(s)?',
+ r'(?P<start>.+)(?P<sign>[+-])(?P<time>\d+)(?P<unit>microsecond|second|minute|hour|day|week|month|year)s?',
date_str)
if match is not None:
start_time = datetime_from_str(match.group('start'), precision, format)
def date_from_str(date_str, format='%Y%m%d', strict=False):
- """
- Return a datetime object from a string in the format YYYYMMDD or
- (now|today|yesterday|date)[+-][0-9](microsecond|second|minute|hour|day|week|month|year)(s)?
-
- If "strict", only (now|today)[+-][0-9](day|week|month|year)(s)? is allowed
+ R"""
+ Return a date object from a string using datetime_from_str
- format: string date format used to return datetime object from
+ @param strict Restrict allowed patterns to "YYYYMMDD" and
+ (now|today|yesterday)(-\d+(day|week|month|year)s?)?
"""
- if strict and not re.fullmatch(r'\d{8}|(now|today)[+-]\d+(day|week|month|year)(s)?', date_str):
- raise ValueError(f'Invalid date format {date_str}')
+ if strict and not re.fullmatch(r'\d{8}|(now|today|yesterday)(-\d+(day|week|month|year)s?)?', date_str):
+ raise ValueError(f'Invalid date format "{date_str}"')
return datetime_from_str(date_str, precision='microsecond', format=format).date()
return date_str
-class DateRange(object):
+class DateRange:
"""Represents a time interval between two dates"""
def __init__(self, start=None, end=None):
return self.start <= date <= self.end
def __str__(self):
- return '%s - %s' % (self.start.isoformat(), self.end.isoformat())
+ return f'{self.start.isoformat()} - {self.end.isoformat()}'
def platform_name():
return res
+@functools.cache
def get_windows_version():
''' Get Windows version. None if it's not running on Windows '''
if compat_os_name == 'nt':
def write_string(s, out=None, encoding=None):
- if out is None:
- out = sys.stderr
- assert type(s) == compat_str
+ assert isinstance(s, str)
+ out = out or sys.stderr
+ if compat_os_name == 'nt' and supports_terminal_sequences(out):
+ s = re.sub(r'([\r\n]+)', r' \1', s)
+
+ enc = None
if 'b' in getattr(out, 'mode', ''):
- byt = s.encode(encoding or preferredencoding(), 'ignore')
- out.write(byt)
+ enc = encoding or preferredencoding()
elif hasattr(out, 'buffer'):
+ out = out.buffer
enc = encoding or getattr(out, 'encoding', None) or preferredencoding()
- byt = s.encode(enc, 'ignore')
- out.buffer.write(byt)
- else:
- out.write(s)
+
+ out.write(s.encode(enc, 'ignore') if enc else s)
out.flush()
raise LockingUnsupportedError()
-class locked_file(object):
+class locked_file:
locked = False
def __init__(self, filename, mode, block=True, encoding=None):
try:
_lock_file(self.f, exclusive, self.block)
self.locked = True
- except IOError:
+ except OSError:
self.f.close()
raise
if 'w' in self.mode:
- self.f.truncate()
+ try:
+ self.f.truncate()
+ except OSError as e:
+ if e.errno != 29: # Illegal seek, expected when self.f is a FIFO
+ raise e
return self
def unlock(self):
return iter(self.f)
+@functools.cache
def get_filesystem_encoding():
encoding = sys.getfilesystemencoding()
return encoding if encoding is not None else 'utf-8'
# a bytestring, but since unicode_literals turns
# every string into a unicode string, it fails.
return
- title_bytes = title.encode('utf-8')
+ title_bytes = title.encode()
buf = ctypes.create_string_buffer(len(title_bytes))
buf.value = title_bytes
try:
def urljoin(base, path):
if isinstance(path, bytes):
- path = path.decode('utf-8')
+ path = path.decode()
if not isinstance(path, compat_str) or not path:
return None
if re.match(r'^(?:[a-zA-Z][a-zA-Z0-9+-.]*:)?//', path):
return path
if isinstance(base, bytes):
- base = base.decode('utf-8')
+ base = base.decode()
if not isinstance(base, compat_str) or not re.match(
r'^(?:https?:)?//', base):
return None
else:
return None
- duration = 0
- if secs:
- duration += float(secs)
- if mins:
- duration += float(mins) * 60
- if hours:
- duration += float(hours) * 60 * 60
- if days:
- duration += float(days) * 24 * 60 * 60
if ms:
- duration += float(ms.replace(':', '.'))
- return duration
+ ms = ms.replace(':', '.')
+ return sum(float(part or 0) * mult for part, mult in (
+ (days, 86400), (hours, 3600), (mins, 60), (secs, 1), (ms, 1)))
def prepend_extension(filename, ext, expected_real_ext=None):
name, real_ext = os.path.splitext(filename)
return (
- '{0}.{1}{2}'.format(name, ext, real_ext)
+ f'{name}.{ext}{real_ext}'
if not expected_real_ext or real_ext[1:] == expected_real_ext
- else '{0}.{1}'.format(filename, ext))
+ else f'{filename}.{ext}')
def replace_extension(filename, ext, expected_real_ext=None):
name, real_ext = os.path.splitext(filename)
- return '{0}.{1}'.format(
+ return '{}.{}'.format(
name if not expected_real_ext or real_ext[1:] == expected_real_ext else filename,
ext)
class LazyList(collections.abc.Sequence):
- ''' Lazy immutable list from an iterable
- Note that slices of a LazyList are lists and not LazyList'''
+ """Lazy immutable list from an iterable
+ Note that slices of a LazyList are lists and not LazyList"""
class IndexError(IndexError):
pass
def __init__(self, iterable, *, reverse=False, _cache=None):
- self.__iterable = iter(iterable)
- self.__cache = [] if _cache is None else _cache
- self.__reversed = reverse
+ self._iterable = iter(iterable)
+ self._cache = [] if _cache is None else _cache
+ self._reversed = reverse
def __iter__(self):
- if self.__reversed:
+ if self._reversed:
# We need to consume the entire iterable to iterate in reverse
yield from self.exhaust()
return
- yield from self.__cache
- for item in self.__iterable:
- self.__cache.append(item)
+ yield from self._cache
+ for item in self._iterable:
+ self._cache.append(item)
yield item
- def __exhaust(self):
- self.__cache.extend(self.__iterable)
- # Discard the emptied iterable to make it pickle-able
- self.__iterable = []
- return self.__cache
+ def _exhaust(self):
+ self._cache.extend(self._iterable)
+ self._iterable = [] # Discard the emptied iterable to make it pickle-able
+ return self._cache
def exhaust(self):
- ''' Evaluate the entire iterable '''
- return self.__exhaust()[::-1 if self.__reversed else 1]
+ """Evaluate the entire iterable"""
+ return self._exhaust()[::-1 if self._reversed else 1]
@staticmethod
- def __reverse_index(x):
+ def _reverse_index(x):
return None if x is None else -(x + 1)
def __getitem__(self, idx):
if isinstance(idx, slice):
- if self.__reversed:
- idx = slice(self.__reverse_index(idx.start), self.__reverse_index(idx.stop), -(idx.step or 1))
+ if self._reversed:
+ idx = slice(self._reverse_index(idx.start), self._reverse_index(idx.stop), -(idx.step or 1))
start, stop, step = idx.start, idx.stop, idx.step or 1
elif isinstance(idx, int):
- if self.__reversed:
- idx = self.__reverse_index(idx)
+ if self._reversed:
+ idx = self._reverse_index(idx)
start, stop, step = idx, idx, 0
else:
raise TypeError('indices must be integers or slices')
or (stop is None and step > 0)):
# We need to consume the entire iterable to be able to slice from the end
# Obviously, never use this with infinite iterables
- self.__exhaust()
+ self._exhaust()
try:
- return self.__cache[idx]
+ return self._cache[idx]
except IndexError as e:
raise self.IndexError(e) from e
- n = max(start or 0, stop or 0) - len(self.__cache) + 1
+ n = max(start or 0, stop or 0) - len(self._cache) + 1
if n > 0:
- self.__cache.extend(itertools.islice(self.__iterable, n))
+ self._cache.extend(itertools.islice(self._iterable, n))
try:
- return self.__cache[idx]
+ return self._cache[idx]
except IndexError as e:
raise self.IndexError(e) from e
def __bool__(self):
try:
- self[-1] if self.__reversed else self[0]
+ self[-1] if self._reversed else self[0]
except self.IndexError:
return False
return True
def __len__(self):
- self.__exhaust()
- return len(self.__cache)
+ self._exhaust()
+ return len(self._cache)
def __reversed__(self):
- return type(self)(self.__iterable, reverse=not self.__reversed, _cache=self.__cache)
+ return type(self)(self._iterable, reverse=not self._reversed, _cache=self._cache)
def __copy__(self):
- return type(self)(self.__iterable, reverse=self.__reversed, _cache=self.__cache)
+ return type(self)(self._iterable, reverse=self._reversed, _cache=self._cache)
def __repr__(self):
# repr and str should mimic a list. So we exhaust the iterable
class OnDemandPagedList(PagedList):
"""Download pages until a page with less than maximum results"""
+
def _getslice(self, start, end):
for pagenum in itertools.count(start // self._pagesize):
firstid = pagenum * self._pagesize
class InAdvancePagedList(PagedList):
"""PagedList with total number of pages known in advance"""
+
def __init__(self, pagefunc, pagecount, pagesize):
PagedList.__init__(self, pagefunc, pagesize, True)
self._pagecount = pagecount
for k, v in data.items():
out += b'--' + boundary.encode('ascii') + b'\r\n'
if isinstance(k, compat_str):
- k = k.encode('utf-8')
+ k = k.encode()
if isinstance(v, compat_str):
- v = v.encode('utf-8')
+ v = v.encode()
# RFC 2047 requires non-ASCII field names to be encoded, while RFC 7578
# suggests sending UTF-8 directly. Firefox sends UTF-8, too
content = b'Content-Disposition: form-data; name="' + k + b'"\r\n\r\n' + v + b'\r\n'
def parse_age_limit(s):
- if type(s) == int:
+ # isinstance(False, int) is True. So type() must be used instead
+ if type(s) is int: # noqa: E721
return s if 0 <= s <= 21 else None
- if not isinstance(s, str):
+ elif not isinstance(s, str):
return None
m = re.match(r'^(?P<age>\d{1,2})\+?$', s)
if m:
def js_to_json(code, vars={}):
# vars is a dict of var, val pairs to substitute
COMMENT_RE = r'/\*(?:(?!\*/).)*?\*/|//[^\n]*\n'
- SKIP_RE = r'\s*(?:{comment})?\s*'.format(comment=COMMENT_RE)
+ SKIP_RE = fr'\s*(?:{COMMENT_RE})?\s*'
INTEGER_TABLE = (
- (r'(?s)^(0[xX][0-9a-fA-F]+){skip}:?$'.format(skip=SKIP_RE), 16),
- (r'(?s)^(0+[0-7]+){skip}:?$'.format(skip=SKIP_RE), 8),
+ (fr'(?s)^(0[xX][0-9a-fA-F]+){SKIP_RE}:?$', 16),
+ (fr'(?s)^(0+[0-7]+){SKIP_RE}:?$', 8),
)
def fix_kv(m):
return q
-POSTPROCESS_WHEN = {'pre_process', 'after_filter', 'before_dl', 'after_move', 'post_process', 'after_video', 'playlist'}
+POSTPROCESS_WHEN = ('pre_process', 'after_filter', 'before_dl', 'after_move', 'post_process', 'after_video', 'playlist')
DEFAULT_OUTTMPL = {
return {}
split_codecs = list(filter(None, map(
str.strip, codecs_str.strip().strip(',').split(','))))
- vcodec, acodec, tcodec, hdr = None, None, None, None
+ vcodec, acodec, scodec, hdr = None, None, None, None
for full_codec in split_codecs:
parts = full_codec.split('.')
codec = parts[0].replace('0', '')
if not acodec:
acodec = full_codec
elif codec in ('stpp', 'wvtt',):
- if not tcodec:
- tcodec = full_codec
+ if not scodec:
+ scodec = full_codec
else:
- write_string('WARNING: Unknown codec %s\n' % full_codec, sys.stderr)
- if vcodec or acodec or tcodec:
+ write_string(f'WARNING: Unknown codec {full_codec}\n')
+ if vcodec or acodec or scodec:
return {
'vcodec': vcodec or 'none',
'acodec': acodec or 'none',
'dynamic_range': hdr,
- **({'tcodec': tcodec} if tcodec is not None else {}),
+ **({'scodec': scodec} if scodec is not None else {}),
}
elif len(split_codecs) == 2:
return {
(b'\xff\xfe', 'utf-16-le'),
(b'\xfe\xff', 'utf-16-be'),
]
+
+ encoding = 'utf-8'
for bom, enc in BOMS:
- if first_bytes.startswith(bom):
- s = first_bytes[len(bom):].decode(enc, 'replace')
- break
- else:
- s = first_bytes.decode('utf-8', 'replace')
+ while first_bytes.startswith(bom):
+ encoding, first_bytes = enc, first_bytes[len(bom):]
- return re.match(r'^\s*<', s)
+ return re.match(r'^\s*<', first_bytes.decode(encoding, 'replace'))
def determine_protocol(info_dict):
def match_filter_func(filters):
if not filters:
return None
- filters = variadic(filters)
+ filters = set(variadic(filters))
- def _match_func(info_dict, *args, **kwargs):
- if any(match_str(f, info_dict, *args, **kwargs) for f in filters):
- return None
+ interactive = '-' in filters
+ if interactive:
+ filters.remove('-')
+
+ def _match_func(info_dict, incomplete=False):
+ if not filters or any(match_str(f, info_dict, incomplete) for f in filters):
+ return NO_DEFAULT if interactive and not incomplete else None
else:
video_title = info_dict.get('title') or info_dict.get('id') or 'video'
filter_str = ') | ('.join(map(str.strip, filters))
if not time_expr:
return
- mobj = re.match(r'^(?P<time_offset>\d+(?:\.\d+)?)s?$', time_expr)
+ mobj = re.match(rf'^(?P<time_offset>{NUMBER_RE})s?$', time_expr)
if mobj:
return float(mobj.group('time_offset'))
styles = {}
default_style = {}
- class TTMLPElementParser(object):
+ class TTMLPElementParser:
_out = ''
_unclosed_elements = []
_applied_styles = []
return ''.join(out)
-def cli_option(params, command_option, param):
+def cli_option(params, command_option, param, separator=None):
param = params.get(param)
- if param:
- param = compat_str(param)
- return [command_option, param] if param is not None else []
+ return ([] if param is None
+ else [command_option, str(param)] if separator is None
+ else [f'{command_option}{separator}{param}'])
def cli_bool_option(params, command_option, param, true_value='true', false_value='false', separator=None):
param = params.get(param)
- if param is None:
- return []
- assert isinstance(param, bool)
- if separator:
- return [command_option + separator + (true_value if param else false_value)]
- return [command_option, true_value if param else false_value]
+ assert param in (True, False, None)
+ return cli_option({True: true_value, False: false_value}, command_option, param, separator)
def cli_valueless_option(params, command_option, param, expected_value=True):
- param = params.get(param)
- return [command_option] if param == expected_value else []
+ return [command_option] if params.get(param) == expected_value else []
def cli_configuration_args(argdict, keys, default=[], use_compat=True):
return cli_configuration_args(argdict, keys, default, use_compat)
-class ISO639Utils(object):
+class ISO639Utils:
# See http://www.loc.gov/standards/iso639-2/ISO-639-2_utf-8.txt
_lang_map = {
'aa': 'aar',
return short_name
-class ISO3166Utils(object):
+class ISO3166Utils:
# From http://data.okfn.org/data/core/country-list
_country_map = {
'AF': 'Afghanistan',
'YE': 'Yemen',
'ZM': 'Zambia',
'ZW': 'Zimbabwe',
+ # Not ISO 3166 codes, but used for IP blocks
+ 'AP': 'Asia/Pacific Region',
+ 'EU': 'Europe',
}
@classmethod
return cls._country_map.get(code.upper())
-class GeoUtils(object):
+class GeoUtils:
# Major IPv4 address blocks per country
_country_ip_map = {
'AD': '46.172.224.0/19',
header = png_data[8:]
if png_data[:8] != b'\x89PNG\x0d\x0a\x1a\x0a' or header[4:8] != b'IHDR':
- raise IOError('Not a valid PNG file.')
+ raise OSError('Not a valid PNG file.')
int_map = {1: '>B', 2: '>H', 4: '>I'}
unpack_integer = lambda x: compat_struct_unpack(int_map[len(x)], x)[0]
idat += chunk['data']
if not idat:
- raise IOError('Unable to read PNG data.')
+ raise OSError('Unable to read PNG data.')
decompressed_data = bytearray(zlib.decompress(idat))
def write_xattr(path, key, value):
- # This mess below finds the best xattr tool for the job
- try:
- # try the pyxattr module...
- import xattr
-
- if hasattr(xattr, 'set'): # pyxattr
- # Unicode arguments are not supported in python-pyxattr until
- # version 0.5.0
- # See https://github.com/ytdl-org/youtube-dl/issues/5498
- pyxattr_required_version = '0.5.0'
- if version_tuple(xattr.__version__) < version_tuple(pyxattr_required_version):
- # TODO: fallback to CLI tools
- raise XAttrUnavailableError(
- 'python-pyxattr is detected but is too old. '
- 'yt-dlp requires %s or above while your version is %s. '
- 'Falling back to other xattr implementations' % (
- pyxattr_required_version, xattr.__version__))
-
- setxattr = xattr.set
- else: # xattr
- setxattr = xattr.setxattr
+ # Windows: Write xattrs to NTFS Alternate Data Streams:
+ # http://en.wikipedia.org/wiki/NTFS#Alternate_data_streams_.28ADS.29
+ if compat_os_name == 'nt':
+ assert ':' not in key
+ assert os.path.exists(path)
try:
- setxattr(path, key, value)
- except EnvironmentError as e:
+ with open(f'{path}:{key}', 'wb') as f:
+ f.write(value)
+ except OSError as e:
raise XAttrMetadataError(e.errno, e.strerror)
+ return
- except ImportError:
- if compat_os_name == 'nt':
- # Write xattrs to NTFS Alternate Data Streams:
- # http://en.wikipedia.org/wiki/NTFS#Alternate_data_streams_.28ADS.29
- assert ':' not in key
- assert os.path.exists(path)
-
- ads_fn = path + ':' + key
- try:
- with open(ads_fn, 'wb') as f:
- f.write(value)
- except EnvironmentError as e:
- raise XAttrMetadataError(e.errno, e.strerror)
- else:
- user_has_setfattr = check_executable('setfattr', ['--version'])
- user_has_xattr = check_executable('xattr', ['-h'])
-
- if user_has_setfattr or user_has_xattr:
+ # UNIX Method 1. Use xattrs/pyxattrs modules
+ from .dependencies import xattr
- value = value.decode('utf-8')
- if user_has_setfattr:
- executable = 'setfattr'
- opts = ['-n', key, '-v', value]
- elif user_has_xattr:
- executable = 'xattr'
- opts = ['-w', key, value]
+ setxattr = None
+ if getattr(xattr, '_yt_dlp__identifier', None) == 'pyxattr':
+ # Unicode arguments are not supported in pyxattr until version 0.5.0
+ # See https://github.com/ytdl-org/youtube-dl/issues/5498
+ if version_tuple(xattr.__version__) >= (0, 5, 0):
+ setxattr = xattr.set
+ elif xattr:
+ setxattr = xattr.setxattr
- cmd = ([encodeFilename(executable, True)]
- + [encodeArgument(o) for o in opts]
- + [encodeFilename(path, True)])
+ if setxattr:
+ try:
+ setxattr(path, key, value)
+ except OSError as e:
+ raise XAttrMetadataError(e.errno, e.strerror)
+ return
- try:
- p = Popen(
- cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)
- except EnvironmentError as e:
- raise XAttrMetadataError(e.errno, e.strerror)
- stdout, stderr = p.communicate_or_kill()
- stderr = stderr.decode('utf-8', 'replace')
- if p.returncode != 0:
- raise XAttrMetadataError(p.returncode, stderr)
+ # UNIX Method 2. Use setfattr/xattr executables
+ exe = ('setfattr' if check_executable('setfattr', ['--version'])
+ else 'xattr' if check_executable('xattr', ['-h']) else None)
+ if not exe:
+ raise XAttrUnavailableError(
+ 'Couldn\'t find a tool to set the xattrs. Install either the python "xattr" or "pyxattr" modules or the '
+ + ('"xattr" binary' if sys.platform != 'linux' else 'GNU "attr" package (which contains the "setfattr" tool)'))
- else:
- # On Unix, and can't find pyxattr, setfattr, or xattr.
- if sys.platform.startswith('linux'):
- raise XAttrUnavailableError(
- "Couldn't find a tool to set the xattrs. "
- "Install either the python 'pyxattr' or 'xattr' "
- "modules, or the GNU 'attr' package "
- "(which contains the 'setfattr' tool).")
- else:
- raise XAttrUnavailableError(
- "Couldn't find a tool to set the xattrs. "
- "Install either the python 'xattr' module, "
- "or the 'xattr' binary.")
+ value = value.decode()
+ try:
+ p = Popen(
+ [exe, '-w', key, value, path] if exe == 'xattr' else [exe, '-n', key, '-v', value, path],
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)
+ except OSError as e:
+ raise XAttrMetadataError(e.errno, e.strerror)
+ stderr = p.communicate_or_kill()[1].decode('utf-8', 'replace')
+ if p.returncode:
+ raise XAttrMetadataError(p.returncode, stderr)
def random_birthday(year_field, month_field, day_field):
# Templates for internet shortcut files, which are plain text files.
-DOT_URL_LINK_TEMPLATE = '''
+DOT_URL_LINK_TEMPLATE = '''\
[InternetShortcut]
URL=%(url)s
-'''.lstrip()
+'''
-DOT_WEBLOC_LINK_TEMPLATE = '''
+DOT_WEBLOC_LINK_TEMPLATE = '''\
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
\t<string>%(url)s</string>
</dict>
</plist>
-'''.lstrip()
+'''
-DOT_DESKTOP_LINK_TEMPLATE = '''
+DOT_DESKTOP_LINK_TEMPLATE = '''\
[Desktop Entry]
Encoding=UTF-8
Name=%(filename)s
Type=Link
URL=%(url)s
Icon=text-html
-'''.lstrip()
+'''
LINK_TEMPLATES = {
'url': DOT_URL_LINK_TEMPLATE,
net_location += ':' + urllib.parse.quote(iri_parts.password, safe=r"!$%&'()*+,~")
net_location += '@'
- net_location += iri_parts.hostname.encode('idna').decode('utf-8') # Punycode for Unicode hostnames.
+ net_location += iri_parts.hostname.encode('idna').decode() # Punycode for Unicode hostnames.
# The 'idna' encoding produces ASCII text.
if iri_parts.port is not None and iri_parts.port != 80:
net_location += ':' + str(iri_parts.port)
def to_high_limit_path(path):
if sys.platform in ['win32', 'cygwin']:
# Work around MAX_PATH limitation on Windows. The maximum allowed length for the individual path segments may still be quite limited.
- return r'\\?\ '.rstrip() + os.path.abspath(path)
+ return '\\\\?\\' + os.path.abspath(path)
return path
if dn and not os.path.exists(dn):
os.makedirs(dn)
return True
- except (OSError, IOError) as err:
+ except OSError as err:
if callable(to_screen) is not None:
to_screen('unable to create directory ' + error_to_compat_str(err))
return False
def get_executable_path():
- from zipimport import zipimporter
- if hasattr(sys, 'frozen'): # Running from PyInstaller
- path = os.path.dirname(sys.executable)
- elif isinstance(__loader__, zipimporter): # Running from ZIP
- path = os.path.join(os.path.dirname(__file__), '../..')
- else:
- path = os.path.join(os.path.dirname(__file__), '..')
- return os.path.abspath(path)
+ from .update import _get_variant_and_executable_path
+
+ return os.path.dirname(os.path.abspath(_get_variant_and_executable_path()[1]))
def load_plugins(name, suffix, namespace):
classes = {}
- try:
+ with contextlib.suppress(FileNotFoundError):
plugins_spec = importlib.util.spec_from_file_location(
name, os.path.join(get_executable_path(), 'ytdlp_plugins', name, '__init__.py'))
plugins = importlib.util.module_from_spec(plugins_spec)
continue
klass = getattr(plugins, name)
classes[name] = namespace[name] = klass
- except FileNotFoundError:
- pass
return classes
casesense=True, is_user_input=False, traverse_string=False):
''' Traverse nested list/dict/tuple
@param path_list A list of paths which are checked one by one.
- Each path is a list of keys where each key is a string,
- a function, a tuple of strings/None or "...".
- When a fuction is given, it takes the key and value as arguments
- and returns whether the key matches or not. When a tuple is given,
- all the keys given in the tuple are traversed, and
- "..." traverses all the keys in the object
- "None" returns the object without traversal
+ Each path is a list of keys where each key is a:
+ - None: Do nothing
+ - string: A dictionary key
+ - int: An index into a list
+ - tuple: A list of keys all of which will be traversed
+ - Ellipsis: Fetch all values in the object
+ - Function: Takes the key and value as arguments
+ and returns whether the key matches or not
@param default Default value to return
@param expected_type Only accept final value of this type (Can also be any callable)
@param get_all Return all the values obtained from a path or only the first one
}
if headers:
header_data.update(headers)
- header_b64 = base64.b64encode(json.dumps(header_data).encode('utf-8'))
- payload_b64 = base64.b64encode(json.dumps(payload_data).encode('utf-8'))
- h = hmac.new(key.encode('utf-8'), header_b64 + b'.' + payload_b64, hashlib.sha256)
+ header_b64 = base64.b64encode(json.dumps(header_data).encode())
+ payload_b64 = base64.b64encode(json.dumps(payload_data).encode())
+ h = hmac.new(key.encode(), header_b64 + b'.' + payload_b64, hashlib.sha256)
signature_b64 = base64.b64encode(h.digest())
token = header_b64 + b'.' + payload_b64 + b'.' + signature_b64
return token
return payload_data
+WINDOWS_VT_MODE = False if compat_os_name == 'nt' else None
+
+
+@functools.cache
def supports_terminal_sequences(stream):
if compat_os_name == 'nt':
- from .compat import WINDOWS_VT_MODE # Must be imported locally
if not WINDOWS_VT_MODE or get_windows_version() < (10, 0, 10586):
return False
elif not os.getenv('TERM'):
return False
+def windows_enable_vt_mode(): # TODO: Do this the proper way https://bugs.python.org/issue30075
+ if compat_os_name != 'nt':
+ return
+ global WINDOWS_VT_MODE
+ startupinfo = subprocess.STARTUPINFO()
+ startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
+ try:
+ subprocess.Popen('', shell=True, startupinfo=startupinfo).wait()
+ except Exception:
+ return
+
+ WINDOWS_VT_MODE = True
+ supports_terminal_sequences.cache_clear()
+
+
_terminal_sequences_re = re.compile('\033\\[[^m]+m')
"""
_keys = ('width', 'height')
max_dimensions = max(
- [tuple(format.get(k) or 0 for k in _keys) for format in formats],
+ (tuple(format.get(k) or 0 for k in _keys) for format in formats),
default=(0, 0))
if not max_dimensions[0]:
return thumbnails
return int(crg.group(1)), int_or_none(crg.group(2)), int_or_none(crg.group(3))
+def read_stdin(what):
+ eof = 'Ctrl+Z' if compat_os_name == 'nt' else 'Ctrl+D'
+ write_string(f'Reading {what} from STDIN - EOF ({eof}) to end:\n')
+ return sys.stdin
+
+
class Config:
own_args = None
+ parsed_args = None
filename = None
__initialized = False
def __init__(self, parser, label=None):
- self._parser, self.label = parser, label
+ self.parser, self.label = parser, label
self._loaded_paths, self.configs = set(), []
def init(self, args=None, filename=None):
return False
self._loaded_paths.add(location)
- self.__initialized = True
- self.own_args, self.filename = args, filename
- for location in self._parser.parse_args(args)[0].config_locations or []:
+ self.own_args, self.__initialized = args, True
+ opts, _ = self.parser.parse_known_args(args)
+ self.parsed_args, self.filename = args, filename
+
+ for location in opts.config_locations or []:
+ if location == '-':
+ self.append_config(shlex.split(read_stdin('options'), comments=True), label='stdin')
+ continue
location = os.path.join(directory, expand_path(location))
if os.path.isdir(location):
location = os.path.join(location, 'yt-dlp.conf')
if not os.path.exists(location):
- self._parser.error(f'config location {location} does not exist')
+ self.parser.error(f'config location {location} does not exist')
self.append_config(self.read_file(location), location)
return True
def read_file(filename, default=[]):
try:
optionf = open(filename)
- except IOError:
+ except OSError:
return default # silently skip if file is not present
try:
# FIXME: https://github.com/ytdl-org/youtube-dl/commit/dfe5fa49aed02cf36ba9f743b11b0903554b5e56
@staticmethod
def hide_login_info(opts):
- PRIVATE_OPTS = set(['-p', '--password', '-u', '--username', '--video-password', '--ap-password', '--ap-username'])
+ PRIVATE_OPTS = {'-p', '--password', '-u', '--username', '--video-password', '--ap-password', '--ap-username'}
eqre = re.compile('^(?P<key>' + ('|'.join(re.escape(po) for po in PRIVATE_OPTS)) + ')=.+$')
def _scrub_eq(o):
return opts
def append_config(self, *args, label=None):
- config = type(self)(self._parser, label)
+ config = type(self)(self.parser, label)
config._loaded_paths = self._loaded_paths
if config.init(*args):
self.configs.append(config)
def all_args(self):
for config in reversed(self.configs):
yield from config.all_args
- yield from self.own_args or []
+ yield from self.parsed_args or []
+
+ def parse_known_args(self, **kwargs):
+ return self.parser.parse_known_args(self.all_args, **kwargs)
def parse_args(self):
- return self._parser.parse_args(list(self.all_args))
+ return self.parser.parse_args(self.all_args)
class WebSocketsWrapper():
"""Wraps websockets module to use in non-async scopes"""
+ pool = None
def __init__(self, url, headers=None, connect=True):
- self.loop = asyncio.events.new_event_loop()
- self.conn = compat_websockets.connect(
+ self.loop = asyncio.new_event_loop()
+ # XXX: "loop" is deprecated
+ self.conn = websockets.connect(
url, extra_headers=headers, ping_interval=None,
close_timeout=float('inf'), loop=self.loop, ping_timeout=float('inf'))
if connect:
# for contributors: If there's any new library using asyncio needs to be run in non-async, move these function out of this class
@staticmethod
def run_with_loop(main, loop):
- if not asyncio.coroutines.iscoroutine(main):
+ if not asyncio.iscoroutine(main):
raise ValueError(f'a coroutine was expected, got {main!r}')
try:
@staticmethod
def _cancel_all_tasks(loop):
- to_cancel = asyncio.tasks.all_tasks(loop)
+ to_cancel = asyncio.all_tasks(loop)
if not to_cancel:
return
for task in to_cancel:
task.cancel()
+ # XXX: "loop" is removed in python 3.10+
loop.run_until_complete(
- asyncio.tasks.gather(*to_cancel, loop=loop, return_exceptions=True))
+ asyncio.gather(*to_cancel, loop=loop, return_exceptions=True))
for task in to_cancel:
if task.cancelled():
})
-has_websockets = bool(compat_websockets)
-
-
def merge_headers(*dicts):
"""Merge dicts of http headers case insensitively, prioritizing the latter ones"""
return {k.title(): v for k, v in itertools.chain.from_iterable(map(dict.items, dicts))}
class classproperty:
- def __init__(self, f):
- self.f = f
+ """classmethod(property(func)) that works in py < 3.9"""
+
+ def __init__(self, func):
+ functools.update_wrapper(self, func)
+ self.func = func
def __get__(self, _, cls):
- return self.f(cls)
+ return self.func(cls)
+
+
+class Namespace:
+ """Immutable namespace"""
+
+ def __init__(self, **kwargs):
+ self._dict = kwargs
+
+ def __getattr__(self, attr):
+ return self._dict[attr]
+
+ def __contains__(self, item):
+ return item in self._dict.values()
+
+ def __iter__(self):
+ return iter(self._dict.items())
+
+ def __repr__(self):
+ return f'{type(self).__name__}({", ".join(f"{k}={v}" for k, v in self)})'
+
+
+# Deprecated
+has_certifi = bool(certifi)
+has_websockets = bool(websockets)