from .compat import (
asyncio,
- compat_brotli,
compat_chr,
compat_cookiejar,
compat_etree_fromstring,
compat_urllib_parse_urlparse,
compat_urllib_request,
compat_urlparse,
- compat_websockets,
)
+from .dependencies import brotli, certifi, websockets
from .socks import ProxyType, sockssocket
-try:
- import certifi
-
- # The certificate may not be bundled in executable
- has_certifi = os.path.exists(certifi.where())
-except ImportError:
- has_certifi = False
-
def register_socks_protocols():
# "Register" SOCKS protocols
SUPPORTED_ENCODINGS = [
'gzip', 'deflate'
]
-if compat_brotli:
+if brotli:
SUPPORTED_ENCODINGS.append('br')
std_headers = {
PACKED_CODES_RE = r"}\('(.+)',(\d+),(\d+),'([^']+)'\.split\('\|'\)"
JSON_LD_RE = r'(?is)<script[^>]+type=(["\']?)application/ld\+json\1[^>]*>(?P<json_ld>.+?)</script>'
+NUMBER_RE = r'\d+(?:\.\d+)?'
+
def preferredencoding():
"""Get preferred encoding.
def brotli(data):
if not data:
return data
- return compat_brotli.decompress(data)
+ return brotli.decompress(data)
def http_request(self, req):
# According to RFC 3986, URLs can not contain non-ASCII characters, however this is not
try:
cf.write(prepare_line(line))
except compat_cookiejar.LoadError as e:
+ if f'{line.strip()} '[0] in '[{"':
+ raise compat_cookiejar.LoadError(
+ 'Cookies file must be Netscape formatted, not JSON. See '
+ 'https://github.com/ytdl-org/youtube-dl#how-do-i-pass-cookies-to-youtube-dl')
write_string(f'WARNING: skipping cookie file entry due to {e}: {line!r}\n')
continue
cf.seek(0)
assert isinstance(s, str)
out = out or sys.stderr
+ from .compat import WINDOWS_VT_MODE # Must be imported locally
+ if WINDOWS_VT_MODE:
+ s = re.sub(r'([\r\n]+)', r' \1', s)
+
if 'b' in getattr(out, 'mode', ''):
byt = s.encode(encoding or preferredencoding(), 'ignore')
out.write(byt)
return q
-POSTPROCESS_WHEN = {'pre_process', 'after_filter', 'before_dl', 'after_move', 'post_process', 'after_video', 'playlist'}
+POSTPROCESS_WHEN = ('pre_process', 'after_filter', 'before_dl', 'after_move', 'post_process', 'after_video', 'playlist')
DEFAULT_OUTTMPL = {
return {}
split_codecs = list(filter(None, map(
str.strip, codecs_str.strip().strip(',').split(','))))
- vcodec, acodec, tcodec, hdr = None, None, None, None
+ vcodec, acodec, scodec, hdr = None, None, None, None
for full_codec in split_codecs:
parts = full_codec.split('.')
codec = parts[0].replace('0', '')
if not acodec:
acodec = full_codec
elif codec in ('stpp', 'wvtt',):
- if not tcodec:
- tcodec = full_codec
+ if not scodec:
+ scodec = full_codec
else:
write_string(f'WARNING: Unknown codec {full_codec}\n')
- if vcodec or acodec or tcodec:
+ if vcodec or acodec or scodec:
return {
'vcodec': vcodec or 'none',
'acodec': acodec or 'none',
'dynamic_range': hdr,
- **({'tcodec': tcodec} if tcodec is not None else {}),
+ **({'scodec': scodec} if scodec is not None else {}),
}
elif len(split_codecs) == 2:
return {
def match_filter_func(filters):
if not filters:
return None
- filters = variadic(filters)
+ filters = set(variadic(filters))
- def _match_func(info_dict, *args, **kwargs):
- if any(match_str(f, info_dict, *args, **kwargs) for f in filters):
- return None
+ interactive = '-' in filters
+ if interactive:
+ filters.remove('-')
+
+ def _match_func(info_dict, incomplete=False):
+ if not filters or any(match_str(f, info_dict, incomplete) for f in filters):
+ return NO_DEFAULT if interactive and not incomplete else None
else:
video_title = info_dict.get('title') or info_dict.get('id') or 'video'
filter_str = ') | ('.join(map(str.strip, filters))
if not time_expr:
return
- mobj = re.match(r'^(?P<time_offset>\d+(?:\.\d+)?)s?$', time_expr)
+ mobj = re.match(rf'^(?P<time_offset>{NUMBER_RE})s?$', time_expr)
if mobj:
return float(mobj.group('time_offset'))
def write_xattr(path, key, value):
- # This mess below finds the best xattr tool for the job
- try:
- # try the pyxattr module...
- import xattr
-
- if hasattr(xattr, 'set'): # pyxattr
- # Unicode arguments are not supported in python-pyxattr until
- # version 0.5.0
- # See https://github.com/ytdl-org/youtube-dl/issues/5498
- pyxattr_required_version = '0.5.0'
- if version_tuple(xattr.__version__) < version_tuple(pyxattr_required_version):
- # TODO: fallback to CLI tools
- raise XAttrUnavailableError(
- 'python-pyxattr is detected but is too old. '
- 'yt-dlp requires %s or above while your version is %s. '
- 'Falling back to other xattr implementations' % (
- pyxattr_required_version, xattr.__version__))
-
- setxattr = xattr.set
- else: # xattr
- setxattr = xattr.setxattr
+ # Windows: Write xattrs to NTFS Alternate Data Streams:
+ # http://en.wikipedia.org/wiki/NTFS#Alternate_data_streams_.28ADS.29
+ if compat_os_name == 'nt':
+ assert ':' not in key
+ assert os.path.exists(path)
try:
- setxattr(path, key, value)
+ with open(f'{path}:{key}', 'wb') as f:
+ f.write(value)
except OSError as e:
raise XAttrMetadataError(e.errno, e.strerror)
+ return
- except ImportError:
- if compat_os_name == 'nt':
- # Write xattrs to NTFS Alternate Data Streams:
- # http://en.wikipedia.org/wiki/NTFS#Alternate_data_streams_.28ADS.29
- assert ':' not in key
- assert os.path.exists(path)
-
- ads_fn = path + ':' + key
- try:
- with open(ads_fn, 'wb') as f:
- f.write(value)
- except OSError as e:
- raise XAttrMetadataError(e.errno, e.strerror)
- else:
- user_has_setfattr = check_executable('setfattr', ['--version'])
- user_has_xattr = check_executable('xattr', ['-h'])
-
- if user_has_setfattr or user_has_xattr:
+ # UNIX Method 1. Use xattrs/pyxattrs modules
+ from .dependencies import xattr
- value = value.decode('utf-8')
- if user_has_setfattr:
- executable = 'setfattr'
- opts = ['-n', key, '-v', value]
- elif user_has_xattr:
- executable = 'xattr'
- opts = ['-w', key, value]
+ setxattr = None
+ if getattr(xattr, '_yt_dlp__identifier', None) == 'pyxattr':
+ # Unicode arguments are not supported in pyxattr until version 0.5.0
+ # See https://github.com/ytdl-org/youtube-dl/issues/5498
+ if version_tuple(xattr.__version__) >= (0, 5, 0):
+ setxattr = xattr.set
+ elif xattr:
+ setxattr = xattr.setxattr
- cmd = ([encodeFilename(executable, True)]
- + [encodeArgument(o) for o in opts]
- + [encodeFilename(path, True)])
+ if setxattr:
+ try:
+ setxattr(path, key, value)
+ except OSError as e:
+ raise XAttrMetadataError(e.errno, e.strerror)
+ return
- try:
- p = Popen(
- cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)
- except OSError as e:
- raise XAttrMetadataError(e.errno, e.strerror)
- stdout, stderr = p.communicate_or_kill()
- stderr = stderr.decode('utf-8', 'replace')
- if p.returncode != 0:
- raise XAttrMetadataError(p.returncode, stderr)
+ # UNIX Method 2. Use setfattr/xattr executables
+ exe = ('setfattr' if check_executable('setfattr', ['--version'])
+ else 'xattr' if check_executable('xattr', ['-h']) else None)
+ if not exe:
+ raise XAttrUnavailableError(
+ 'Couldn\'t find a tool to set the xattrs. Install either the python "xattr" or "pyxattr" modules or the '
+ + ('"xattr" binary' if sys.platform != 'linux' else 'GNU "attr" package (which contains the "setfattr" tool)'))
- else:
- # On Unix, and can't find pyxattr, setfattr, or xattr.
- if sys.platform.startswith('linux'):
- raise XAttrUnavailableError(
- "Couldn't find a tool to set the xattrs. "
- "Install either the python 'pyxattr' or 'xattr' "
- "modules, or the GNU 'attr' package "
- "(which contains the 'setfattr' tool).")
- else:
- raise XAttrUnavailableError(
- "Couldn't find a tool to set the xattrs. "
- "Install either the python 'xattr' module, "
- "or the 'xattr' binary.")
+ value = value.decode('utf-8')
+ try:
+ p = Popen(
+ [exe, '-w', key, value, path] if exe == 'xattr' else [exe, '-n', key, '-v', value, path],
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)
+ except OSError as e:
+ raise XAttrMetadataError(e.errno, e.strerror)
+ stderr = p.communicate_or_kill()[1].decode('utf-8', 'replace')
+ if p.returncode:
+ raise XAttrMetadataError(p.returncode, stderr)
def random_birthday(year_field, month_field, day_field):
pool = None
def __init__(self, url, headers=None, connect=True):
- self.loop = asyncio.events.new_event_loop()
- self.conn = compat_websockets.connect(
+ self.loop = asyncio.new_event_loop()
+ # XXX: "loop" is deprecated
+ self.conn = websockets.connect(
url, extra_headers=headers, ping_interval=None,
close_timeout=float('inf'), loop=self.loop, ping_timeout=float('inf'))
if connect:
# for contributors: If there's any new library using asyncio needs to be run in non-async, move these function out of this class
@staticmethod
def run_with_loop(main, loop):
- if not asyncio.coroutines.iscoroutine(main):
+ if not asyncio.iscoroutine(main):
raise ValueError(f'a coroutine was expected, got {main!r}')
try:
@staticmethod
def _cancel_all_tasks(loop):
- to_cancel = asyncio.tasks.all_tasks(loop)
+ to_cancel = asyncio.all_tasks(loop)
if not to_cancel:
return
for task in to_cancel:
task.cancel()
+ # XXX: "loop" is removed in python 3.10+
loop.run_until_complete(
- asyncio.tasks.gather(*to_cancel, loop=loop, return_exceptions=True))
+ asyncio.gather(*to_cancel, loop=loop, return_exceptions=True))
for task in to_cancel:
if task.cancelled():
})
-has_websockets = bool(compat_websockets)
-
-
def merge_headers(*dicts):
"""Merge dicts of http headers case insensitively, prioritizing the latter ones"""
return {k.title(): v for k, v in itertools.chain.from_iterable(map(dict.items, dicts))}
def Namespace(**kwargs):
return collections.namedtuple('Namespace', kwargs)(**kwargs)
+
+
+# Deprecated
+has_certifi = bool(certifi)
+has_websockets = bool(websockets)