import shutil
import subprocess
import sys
+import tempfile
import time
import tokenize
import traceback
STR_FORMAT_RE,
formatSeconds,
GeoRestrictedError,
+ HEADRequest,
int_or_none,
iri_to_uri,
ISO3166Utils,
preferredencoding,
prepend_extension,
process_communicate_or_kill,
- random_uuidv4,
register_socks_protocols,
RejectedVideoReached,
render_table,
if sys.version_info < (3, 6):
self.report_warning(
- 'Support for Python version %d.%d have been deprecated and will break in future versions of yt-dlp! '
- 'Update to Python 3.6 or above' % sys.version_info[:2])
+ 'Python version %d.%d is not supported! Please update to Python 3.6 or above' % sys.version_info[:2])
def check_deprecated(param, option, suggestion):
if self.params.get(param) is not None:
self.outtmpl_dict = self.parse_outtmpl()
+ # Creating format selector here allows us to catch syntax errors before the extraction
+ self.format_selector = (
+ None if self.params.get('format') is None
+ else self.build_format_selector(self.params['format']))
+
self._setup_opener()
"""Preload the archive, if any is specified"""
'Put from __future__ import unicode_literals at the top of your code file or consider switching to Python 3.x.')
return outtmpl_dict
+ def get_output_path(self, dir_type='', filename=None):
+ paths = self.params.get('paths', {})
+ assert isinstance(paths, dict)
+ path = os.path.join(
+ expand_path(paths.get('home', '').strip()),
+ expand_path(paths.get(dir_type, '').strip()) if dir_type else '',
+ filename or '')
+
+ # Temporary fix for #4787
+ # 'Treat' all problem characters by passing filename through preferredencoding
+ # to workaround encoding issues with subprocess on python2 @ Windows
+ if sys.version_info < (3, 0) and sys.platform == 'win32':
+ path = encodeFilename(path, True).decode(preferredencoding())
+ return sanitize_path(path, force=self.params.get('windowsfilenames'))
+
@staticmethod
def validate_outtmpl(tmpl):
''' @return None or Exception object '''
'autonumber': self.params.get('autonumber_size') or 5,
}
- EXTERNAL_FORMAT_RE = STR_FORMAT_RE.format('[^)]*')
+ TMPL_DICT = {}
+ EXTERNAL_FORMAT_RE = re.compile(STR_FORMAT_RE.format('[^)]*'))
+ MATH_FUNCTIONS = {
+ '+': float.__add__,
+ '-': float.__sub__,
+ }
# Field is of the form key1.key2...
# where keys (except first) can be string, int or slice
- FIELD_RE = r'\w+(?:\.(?:\w+|[-\d]*(?::[-\d]*){0,2}))*'
+ FIELD_RE = r'\w+(?:\.(?:\w+|{num}|{num}?(?::{num}?){{1,2}}))*'.format(num=r'(?:-?\d+)')
+ MATH_FIELD_RE = r'''{field}|{num}'''.format(field=FIELD_RE, num=r'-?\d+(?:.\d+)?')
+ MATH_OPERATORS_RE = r'(?:%s)' % '|'.join(map(re.escape, MATH_FUNCTIONS.keys()))
INTERNAL_FORMAT_RE = re.compile(r'''(?x)
(?P<negate>-)?
- (?P<fields>{0})
- (?P<maths>(?:[-+]-?(?:\d+(?:\.\d+)?|{0}))*)
+ (?P<fields>{field})
+ (?P<maths>(?:{math_op}{math_field})*)
(?:>(?P<strf_format>.+?))?
(?:\|(?P<default>.*?))?
- $'''.format(FIELD_RE))
- MATH_OPERATORS_RE = re.compile(r'(?<![-+])([-+])')
- MATH_FUNCTIONS = {
- '+': float.__add__,
- '-': float.__sub__,
- }
- tmpl_dict = {}
+ $'''.format(field=FIELD_RE, math_op=MATH_OPERATORS_RE, math_field=MATH_FIELD_RE))
get_key = lambda k: traverse_obj(
info_dict, k.split('.'), is_user_input=True, traverse_string=True)
if value is not None:
value *= -1
# Do maths
- if mdict['maths']:
+ offset_key = mdict['maths']
+ if offset_key:
value = float_or_none(value)
operator = None
- for item in MATH_OPERATORS_RE.split(mdict['maths'])[1:]:
- if item == '' or value is None:
- return None
- if operator:
- item, multiplier = (item[1:], -1) if item[0] == '-' else (item, 1)
- offset = float_or_none(item)
- if offset is None:
- offset = float_or_none(get_key(item))
- try:
- value = operator(value, multiplier * offset)
- except (TypeError, ZeroDivisionError):
- return None
- operator = None
- else:
+ while offset_key:
+ item = re.match(
+ MATH_FIELD_RE if operator else MATH_OPERATORS_RE,
+ offset_key).group(0)
+ offset_key = offset_key[len(item):]
+ if operator is None:
operator = MATH_FUNCTIONS[item]
+ continue
+ item, multiplier = (item[1:], -1) if item[0] == '-' else (item, 1)
+ offset = float_or_none(item)
+ if offset is None:
+ offset = float_or_none(get_key(item))
+ try:
+ value = operator(value, multiplier * offset)
+ except (TypeError, ZeroDivisionError):
+ return None
+ operator = None
# Datetime formatting
if mdict['strf_format']:
value = strftime_or_none(value, mdict['strf_format'])
# If value is an object, sanitize might convert it to a string
# So we convert it to repr first
value, fmt = repr(value), '%ss' % fmt[:-1]
- value = sanitize(key, value)
- tmpl_dict[key] = value
+ if fmt[-1] in 'csr':
+ value = sanitize(key, value)
+ TMPL_DICT[key] = value
return '%({key}){fmt}'.format(key=key, fmt=fmt)
- return re.sub(EXTERNAL_FORMAT_RE, create_key, outtmpl), tmpl_dict
+ return EXTERNAL_FORMAT_RE.sub(create_key, outtmpl), TMPL_DICT
def _prepare_filename(self, info_dict, tmpl_type='default'):
try:
def prepare_filename(self, info_dict, dir_type='', warn=False):
"""Generate the output filename."""
- paths = self.params.get('paths', {})
- assert isinstance(paths, dict)
+
filename = self._prepare_filename(info_dict, dir_type or 'default')
if warn and not self.__prepare_filename_warned:
- if not paths:
+ if not self.params.get('paths'):
pass
elif filename == '-':
self.report_warning('--paths is ignored when an outputting to stdout')
if filename == '-' or not filename:
return filename
- homepath = expand_path(paths.get('home', '').strip())
- assert isinstance(homepath, compat_str)
- subdir = expand_path(paths.get(dir_type, '').strip()) if dir_type else ''
- assert isinstance(subdir, compat_str)
- path = os.path.join(homepath, subdir, filename)
-
- # Temporary fix for #4787
- # 'Treat' all problem characters by passing filename through preferredencoding
- # to workaround encoding issues with subprocess on python2 @ Windows
- if sys.version_info < (3, 0) and sys.platform == 'win32':
- path = encodeFilename(path, True).decode(preferredencoding())
- return sanitize_path(path, force=self.params.get('windowsfilenames'))
+ return self.get_output_path(dir_type, filename)
def _match_entry(self, info_dict, incomplete=False, silent=False):
""" Returns None if the file should be downloaded """
'!=': operator.ne,
}
operator_rex = re.compile(r'''(?x)\s*
- (?P<key>width|height|tbr|abr|vbr|asr|filesize|filesize_approx|fps)
- \s*(?P<op>%s)(?P<none_inclusive>\s*\?)?\s*
- (?P<value>[0-9.]+(?:[kKmMgGtTpPeEzZyY]i?[Bb]?)?)
- $
+ (?P<key>width|height|tbr|abr|vbr|asr|filesize|filesize_approx|fps)\s*
+ (?P<op>%s)(?P<none_inclusive>\s*\?)?\s*
+ (?P<value>[0-9.]+(?:[kKmMgGtTpPeEzZyY]i?[Bb]?)?)\s*
''' % '|'.join(map(re.escape, OPERATORS.keys())))
- m = operator_rex.search(filter_spec)
+ m = operator_rex.fullmatch(filter_spec)
if m:
try:
comparison_value = int(m.group('value'))
'$=': lambda attr, value: attr.endswith(value),
'*=': lambda attr, value: value in attr,
}
- str_operator_rex = re.compile(r'''(?x)
- \s*(?P<key>[a-zA-Z0-9._-]+)
- \s*(?P<negation>!\s*)?(?P<op>%s)(?P<none_inclusive>\s*\?)?
- \s*(?P<value>[a-zA-Z0-9._-]+)
- \s*$
+ str_operator_rex = re.compile(r'''(?x)\s*
+ (?P<key>[a-zA-Z0-9._-]+)\s*
+ (?P<negation>!\s*)?(?P<op>%s)(?P<none_inclusive>\s*\?)?\s*
+ (?P<value>[a-zA-Z0-9._-]+)\s*
''' % '|'.join(map(re.escape, STR_OPERATORS.keys())))
- m = str_operator_rex.search(filter_spec)
+ m = str_operator_rex.fullmatch(filter_spec)
if m:
comparison_value = m.group('value')
str_op = STR_OPERATORS[m.group('op')]
op = str_op
if not m:
- raise ValueError('Invalid filter specification %r' % filter_spec)
+ raise SyntaxError('Invalid filter specification %r' % filter_spec)
def _filter(f):
actual_value = f.get(m.group('key'))
def _check_formats(formats):
for f in formats:
self.to_screen('[info] Testing format %s' % f['format_id'])
- paths = self.params.get('paths', {})
- temp_file = os.path.join(
- expand_path(paths.get('home', '').strip()),
- expand_path(paths.get('temp', '').strip()),
- 'ytdl.%s.f%s.check-format' % (random_uuidv4(), f['format_id']))
+ temp_file = tempfile.NamedTemporaryFile(
+ suffix='.tmp', delete=False,
+ dir=self.get_output_path('temp') or None)
+ temp_file.close()
try:
- dl, _ = self.dl(temp_file, f, test=True)
+ dl, _ = self.dl(temp_file.name, f, test=True)
except (ExtractorError, IOError, OSError, ValueError) + network_exceptions:
dl = False
finally:
- if os.path.exists(temp_file):
- os.remove(temp_file)
+ if os.path.exists(temp_file.name):
+ try:
+ os.remove(temp_file.name)
+ except OSError:
+ self.report_warning('Unable to delete temporary file "%s"' % temp_file.name)
if dl:
yield f
else:
self.cookiejar.add_cookie_header(pr)
return pr.get_header('Cookie')
- @staticmethod
- def _sanitize_thumbnails(info_dict):
+ def _sanitize_thumbnails(self, info_dict):
thumbnails = info_dict.get('thumbnails')
if thumbnails is None:
thumbnail = info_dict.get('thumbnail')
t.get('height') if t.get('height') is not None else -1,
t.get('id') if t.get('id') is not None else '',
t.get('url')))
+
+ def test_thumbnail(t):
+ self.to_screen('[info] Testing thumbnail %s' % t['id'])
+ try:
+ self.urlopen(HEADRequest(t['url']))
+ except network_exceptions as err:
+ self.to_screen('[info] Unable to connect to thumbnail %s URL "%s" - %s. Skipping...' % (
+ t['id'], t['url'], error_to_compat_str(err)))
+ return False
+ return True
+
for i, t in enumerate(thumbnails):
- t['url'] = sanitize_url(t['url'])
- if t.get('width') and t.get('height'):
- t['resolution'] = '%dx%d' % (t['width'], t['height'])
if t.get('id') is None:
t['id'] = '%d' % i
+ if t.get('width') and t.get('height'):
+ t['resolution'] = '%dx%d' % (t['width'], t['height'])
+ t['url'] = sanitize_url(t['url'])
+ if self.params.get('check_formats'):
+ info_dict['thumbnails'] = reversed(LazyList(filter(test_thumbnail, thumbnails[::-1])))
def process_video_result(self, info_dict, download=True):
assert info_dict.get('_type', 'video') == 'video'
self.list_formats(info_dict)
return
- req_format = self.params.get('format')
- if req_format is None:
+ format_selector = self.format_selector
+ if format_selector is None:
req_format = self._default_format_spec(info_dict, download=download)
self.write_debug('Default format spec: %s' % req_format)
-
- format_selector = self.build_format_selector(req_format)
+ format_selector = self.build_format_selector(req_format)
# While in format selection we may need to have an access to the original
# format set in order to calculate some metrics or do some processing.
info_dict['epoch'] = int(time.time())
reject = lambda k, v: k in remove_keys
filter_fn = lambda obj: (
- list(map(filter_fn, obj)) if isinstance(obj, (list, tuple, set))
+ list(map(filter_fn, obj)) if isinstance(obj, (LazyList, list, tuple, set))
else obj if not isinstance(obj, dict)
else dict((k, filter_fn(v)) for k, v in obj.items() if not reject(k, v)))
return filter_fn(info_dict)
hideEmpty=new_format)))
def list_thumbnails(self, info_dict):
- thumbnails = info_dict.get('thumbnails')
+ thumbnails = list(info_dict.get('thumbnails'))
if not thumbnails:
self.to_screen('[info] No thumbnails present for %s' % info_dict['id'])
return
if not self.params.get('overwrites', True) and os.path.exists(encodeFilename(thumb_filename)):
ret.append(suffix + thumb_ext)
+ t['filepath'] = thumb_filename
self.to_screen('[%s] %s: Thumbnail %sis already present' %
(info_dict['extractor'], info_dict['id'], thumb_display_id))
else: