legacyserverconnect: Explicitly allow HTTPS connection to servers that do not
support RFC 5746 secure renegotiation
nocheckcertificate: Do not verify SSL certificates
+ client_certificate: Path to client certificate file in PEM format. May include the private key
+ client_certificate_key: Path to private key file for client certificate
+ client_certificate_password: Password for client certificate private key, if encrypted.
+ If not provided and the key is encrypted, yt-dlp will ask interactively
prefer_insecure: Use HTTP instead of HTTPS to retrieve information.
At the moment, this is only supported by YouTube.
http_headers: A dictionary of custom headers to be used for all requests
assert hasattr(self, '_output_process')
assert isinstance(message, compat_str)
line_count = message.count('\n') + 1
- self._output_process.stdin.write((message + '\n').encode('utf-8'))
+ self._output_process.stdin.write((message + '\n').encode())
self._output_process.stdin.flush()
- res = ''.join(self._output_channel.readline().decode('utf-8')
+ res = ''.join(self._output_channel.readline().decode()
for _ in range(line_count))
return res[:-len('\n')]
value = map(str, variadic(value) if '#' in flags else [value])
value, fmt = ' '.join(map(compat_shlex_quote, value)), str_fmt
elif fmt[-1] == 'B': # bytes
- value = f'%{str_fmt}'.encode() % str(value).encode('utf-8')
+ value = f'%{str_fmt}'.encode() % str(value).encode()
value, fmt = value.decode('utf-8', 'ignore'), 's'
elif fmt[-1] == 'U': # unicode normalized
value, fmt = unicodedata.normalize(
return selector_function(ctx_copy)
return final_selector
- stream = io.BytesIO(format_spec.encode('utf-8'))
+ stream = io.BytesIO(format_spec.encode())
try:
tokens = list(_remove_unused_ops(tokenize.tokenize(stream.readline)))
except tokenize.TokenError:
# TODO: move sanitization here
if is_video:
# playlists are allowed to lack "title"
- info_dict['fulltitle'] = info_dict.get('title')
- if 'title' not in info_dict:
+ title = info_dict.get('title', NO_DEFAULT)
+ if title is NO_DEFAULT:
raise ExtractorError('Missing "title" field in extractor result',
video_id=info_dict['id'], ie=info_dict['extractor'])
- elif not info_dict.get('title'):
- self.report_warning('Extractor failed to obtain "title". Creating a generic title instead')
+ info_dict['fulltitle'] = title
+ if not title:
+ if title == '':
+ self.write_debug('Extractor gave empty title. Creating a generic title')
+ else:
+ self.report_warning('Extractor failed to obtain "title". Creating a generic title instead')
info_dict['title'] = f'{info_dict["extractor"].replace(":", "-")} video #{info_dict["id"]}'
if info_dict.get('duration') is not None:
if fixup_policy in ('ignore', 'never'):
return
elif fixup_policy == 'warn':
- do_fixup = False
+ do_fixup = 'warn'
elif fixup_policy != 'force':
assert fixup_policy in ('detect_or_warn', None)
if not info_dict.get('__real_download'):
do_fixup = False
def ffmpeg_fixup(cndn, msg, cls):
- if not cndn:
+ if not (do_fixup and cndn):
return
- if not do_fixup:
+ elif do_fixup == 'warn':
self.report_warning(f'{vid}: {msg}')
return
pp = cls(self)
downloader = downloader.__name__ if downloader else None
if info_dict.get('requested_formats') is None: # Not necessary if doing merger
- ffmpeg_fixup(downloader == 'HlsFD',
+ fixup_live = info_dict.get('is_live') and self.params.get('hls_use_mpegts') is None
+ ffmpeg_fixup(downloader == 'HlsFD' or fixup_live,
'Possible MPEG-TS in MP4 container or malformed AAC timestamps',
FFmpegFixupM3u8PP)
ffmpeg_fixup(info_dict.get('is_live') and downloader == 'DashSegmentsFD',
''' Alias of sanitize_info for backward compatibility '''
return YoutubeDL.sanitize_info(info_dict, actually_filter)
+ def _delete_downloaded_files(self, *files_to_delete, info={}, msg=None):
+ for filename in set(filter(None, files_to_delete)):
+ if msg:
+ self.to_screen(msg % filename)
+ try:
+ os.remove(filename)
+ except OSError:
+ self.report_warning(f'Unable to delete file {filename}')
+ if filename in info.get('__files_to_move', []): # NB: Delete even if None
+ del info['__files_to_move'][filename]
+
@staticmethod
def post_extract(info_dict):
def actual_post_extract(info_dict):
for f in files_to_delete:
infodict['__files_to_move'].setdefault(f, '')
else:
- for old_filename in set(files_to_delete):
- self.to_screen('Deleting original file %s (pass -k to keep)' % old_filename)
- try:
- os.remove(encodeFilename(old_filename))
- except OSError:
- self.report_warning('Unable to remove downloaded original file')
- if old_filename in infodict['__files_to_move']:
- del infodict['__files_to_move'][old_filename]
+ self._delete_downloaded_files(
+ *files_to_delete, info=infodict, msg='Deleting original file %s (pass -k to keep)')
return infodict
def run_all_pps(self, key, info, *, additional_pps=None):
# Not implemented
if False and self.params.get('call_home'):
- ipaddr = self.urlopen('https://yt-dl.org/ip').read().decode('utf-8')
+ ipaddr = self.urlopen('https://yt-dl.org/ip').read().decode()
write_debug('Public IP address: %s' % ipaddr)
latest_version = self.urlopen(
- 'https://yt-dl.org/latest/version').read().decode('utf-8')
+ 'https://yt-dl.org/latest/version').read().decode()
if version_tuple(latest_version) > version_tuple(__version__):
self.report_warning(
'You are using an outdated version (newest version: %s)! '