import datetime
import errno
import fileinput
-import functools
import http.cookiejar
import io
import itertools
import unicodedata
from .cache import Cache
-from .compat import urllib # isort: split
-from .compat import compat_os_name, compat_shlex_quote
+from .compat import functools, urllib # isort: split
+from .compat import compat_os_name, compat_shlex_quote, urllib_req_to_req
from .cookies import LenientSimpleCookie, load_cookies
from .downloader import FFmpegFD, get_suitable_downloader, shorten_protocol_name
from .downloader.rtmp import rtmpdump_version
from .extractor.common import UnsupportedURLIE
from .extractor.openload import PhantomJSwrapper
from .minicurses import format_text
+from .networking import HEADRequest, Request, RequestDirector
+from .networking.common import _REQUEST_HANDLERS
+from .networking.exceptions import (
+ HTTPError,
+ NoSupportingHandlers,
+ RequestError,
+ SSLError,
+ _CompatHTTPError,
+ network_exceptions,
+)
from .plugins import directories as plugin_directories
from .postprocessor import _PLUGIN_CLASSES as plugin_pps
from .postprocessor import (
ExtractorError,
FormatSorter,
GeoRestrictedError,
- HEADRequest,
ISO3166Utils,
LazyList,
MaxDownloadsReached,
Namespace,
PagedList,
- PerRequestProxyHandler,
PlaylistEntries,
Popen,
PostProcessingError,
SameFileError,
UnavailableVideoError,
UserNotLive,
- YoutubeDLCookieProcessor,
- YoutubeDLHandler,
- YoutubeDLRedirectHandler,
age_restricted,
args_to_str,
bug_reports_message,
error_to_compat_str,
escapeHTML,
expand_path,
+ extract_basic_auth,
filter_dict,
float_or_none,
format_bytes,
locked_file,
make_archive_id,
make_dir,
- make_HTTPS_handler,
- merge_headers,
- network_exceptions,
number_of_digits,
orderedSet,
orderedSet_from_options,
sanitize_filename,
sanitize_path,
sanitize_url,
- sanitized_Request,
- std_headers,
str_or_none,
strftime_or_none,
subtitles_filename,
write_json_file,
write_string,
)
-from .utils.networking import clean_headers
+from .utils._utils import _YDLLogger
+from .utils.networking import (
+ HTTPHeaderDict,
+ clean_headers,
+ clean_proxies,
+ std_headers,
+)
from .version import CHANNEL, RELEASE_GIT_HEAD, VARIANT, __version__
if compat_os_name == 'nt':
raise
self.params['compat_opts'] = set(self.params.get('compat_opts', ()))
- self.params['http_headers'] = merge_headers(std_headers, self.params.get('http_headers', {}))
+ self.params['http_headers'] = HTTPHeaderDict(std_headers, self.params.get('http_headers'))
+ self.__header_cookies = []
+ self._load_cookies(self.params['http_headers'].get('Cookie')) # compat
+ self.params['http_headers'].pop('Cookie', None)
+
+ self._request_director = self.build_request_director(
+ sorted(_REQUEST_HANDLERS.values(), key=lambda rh: rh.RH_NAME.lower()))
if auto_init and auto_init != 'no_verbose_header':
self.print_debug_header()
- self.__header_cookies = []
- self._load_cookies(traverse_obj(self.params.get('http_headers'), 'cookie', casesense=False)) # compat
-
def check_deprecated(param, option, suggestion):
if self.params.get(param) is not None:
self.report_warning(f'{option} is deprecated. Use {suggestion} instead')
get_postprocessor(pp_def.pop('key'))(self, **pp_def),
when=when)
- self._setup_opener()
-
def preload_download_archive(fn):
"""Preload the archive, if any is specified"""
archive = set()
def __exit__(self, *args):
self.restore_console_title()
+ self.close()
+
+ def close(self):
self.save_cookies()
+ self._request_director.close()
def trouble(self, message=None, tb=None, is_error=True):
"""Determine action to take when a download problem appears.
else:
break
- fmt = outer_mobj.group('format')
- if fmt == 's' and value is not None and last_field in field_size_compat_map.keys():
- fmt = f'0{field_size_compat_map[last_field]:d}d'
-
if None not in (value, replacement):
try:
value = replacement_formatter.format(replacement, value)
except ValueError:
value, default = None, na
+ fmt = outer_mobj.group('format')
+ if fmt == 's' and last_field in field_size_compat_map.keys() and isinstance(value, int):
+ fmt = f'0{field_size_compat_map[last_field]:d}d'
+
flags = outer_mobj.group('conversion') or ''
str_fmt = f'{fmt[:-1]}s'
if value is None:
return ret
if self.in_download_archive(info_dict):
- reason = '%s has already been recorded in the archive' % video_title
+ reason = ''.join((
+ format_field(info_dict, 'id', f'{self._format_screen("%s", self.Styles.ID)}: '),
+ format_field(info_dict, 'title', f'{self._format_screen("%s", self.Styles.EMPHASIS)} '),
+ 'has already been recorded in the archive'))
break_opt, break_err = 'break_on_existing', ExistingVideoReached
else:
try:
temp_id = ie.get_temp_id(url)
if temp_id is not None and self.in_download_archive({'id': temp_id, 'ie_key': key}):
- self.to_screen(f'[{key}] {temp_id}: has already been recorded in the archive')
+ self.to_screen(f'[download] {self._format_screen(temp_id, self.Styles.ID)}: '
+ 'has already been recorded in the archive')
if self.params.get('break_on_existing', False):
raise ExistingVideoReached()
break
self.to_screen('')
raise
- def _load_cookies(self, data, *, from_headers=True):
+ def _load_cookies(self, data, *, autoscope=True):
"""Loads cookies from a `Cookie` header
This tries to work around the security vulnerability of passing cookies to every domain.
See: https://github.com/yt-dlp/yt-dlp/security/advisories/GHSA-v8mc-9377-rwjj
- The unscoped cookies are saved for later to be stored in the jar with a limited scope.
@param data The Cookie header as string to load the cookies from
- @param from_headers If `False`, allows Set-Cookie syntax in the cookie string (at least a domain will be required)
+ @param autoscope If `False`, scope cookies using Set-Cookie syntax and error for cookie without domains
+ If `True`, save cookies for later to be stored in the jar with a limited scope
+ If a URL, save cookies in the jar with the domain of the URL
"""
for cookie in LenientSimpleCookie(data).values():
- if from_headers and any(cookie.values()):
+ if autoscope and any(cookie.values()):
raise ValueError('Invalid syntax in Cookie Header')
domain = cookie.get('domain') or ''
if domain:
self.cookiejar.set_cookie(prepared_cookie)
- elif from_headers:
+ elif autoscope is True:
self.deprecated_feature(
'Passing cookies as a header is a potential security risk; '
'they will be scoped to the domain of the downloaded urls. '
'Please consider loading cookies from a file or browser instead.')
self.__header_cookies.append(prepared_cookie)
+ elif autoscope:
+ self.report_warning(
+ 'The extractor result contains an unscoped cookie as an HTTP header. '
+ f'If you are using yt-dlp with an input URL{bug_reports_message(before=",")}',
+ only_once=True)
+ self._apply_header_cookies(autoscope, [prepared_cookie])
else:
self.report_error('Unscoped cookies are not allowed; please specify some sort of scoping',
tb=False, is_error=False)
- def _apply_header_cookies(self, url):
+ def _apply_header_cookies(self, url, cookies=None):
"""Applies stray header cookies to the provided url
This loads header cookies and scopes them to the domain provided in `url`.
if not parsed.hostname:
return
- for cookie in map(copy.copy, self.__header_cookies):
+ for cookie in map(copy.copy, cookies or self.__header_cookies):
cookie.domain = f'.{parsed.hostname}'
self.cookiejar.set_cookie(cookie)
parsed_selector = _parse_format_selection(iter(TokenIterator(tokens)))
return _build_selector_function(parsed_selector)
- def _calc_headers(self, info_dict):
- res = merge_headers(self.params['http_headers'], info_dict.get('http_headers') or {})
+ def _calc_headers(self, info_dict, load_cookies=False):
+ res = HTTPHeaderDict(self.params['http_headers'], info_dict.get('http_headers'))
clean_headers(res)
+
+ if load_cookies: # For --load-info-json
+ self._load_cookies(res.get('Cookie'), autoscope=info_dict['url']) # compat
+ self._load_cookies(info_dict.get('cookies'), autoscope=False)
+ # The `Cookie` header is removed to prevent leaks and unscoped cookies.
+ # See: https://github.com/yt-dlp/yt-dlp/security/advisories/GHSA-v8mc-9377-rwjj
+ res.pop('Cookie', None)
cookies = self.cookiejar.get_cookies_for_url(info_dict['url'])
if cookies:
encoder = LenientSimpleCookie()
and info_dict.get('duration') and format.get('tbr')
and not format.get('filesize') and not format.get('filesize_approx')):
format['filesize_approx'] = int(info_dict['duration'] * format['tbr'] * (1024 / 8))
- format['http_headers'] = self._calc_headers(collections.ChainMap(format, info_dict))
+ format['http_headers'] = self._calc_headers(collections.ChainMap(format, info_dict), load_cookies=True)
+
+ # Safeguard against old/insecure infojson when using --load-info-json
+ if info_dict.get('http_headers'):
+ info_dict['http_headers'] = HTTPHeaderDict(info_dict['http_headers'])
+ info_dict['http_headers'].pop('Cookie', None)
# This is copied to http_headers by the above _calc_headers and can now be removed
if '__x_forwarded_for_ip' in info_dict:
infos = [self.sanitize_info(info, self.params.get('clean_infojson', True))
for info in variadic(json.loads('\n'.join(f)))]
for info in infos:
- self._load_cookies(info.get('cookies'), from_headers=False)
- self._load_cookies(traverse_obj(info.get('http_headers'), 'Cookie', casesense=False)) # compat
try:
self.__download_wrapper(self.process_ie_result)(info, download=True)
except (DownloadError, EntryNotInPlaylist, ReExtractInfo) as e:
join_nonempty(*get_package_info(m)) for m in available_dependencies.values()
})) or 'none'))
- self._setup_opener()
- proxy_map = {}
- for handler in self._opener.handlers:
- if hasattr(handler, 'proxies'):
- proxy_map.update(handler.proxies)
- write_debug(f'Proxy map: {proxy_map}')
-
+ write_debug(f'Proxy map: {self.proxies}')
+ # write_debug(f'Request Handlers: {", ".join(rh.RH_NAME for rh in self._request_director.handlers)}')
for plugin_type, plugins in {'Extractor': plugin_ies, 'Post-Processor': plugin_pps}.items():
display_list = ['%s%s' % (
klass.__name__, '' if klass.__name__ == name else f' as {name}')
'See https://yt-dl.org/update if you need help updating.' %
latest_version)
- def _setup_opener(self):
- if hasattr(self, '_opener'):
- return
- timeout_val = self.params.get('socket_timeout')
- self._socket_timeout = 20 if timeout_val is None else float(timeout_val)
+ @functools.cached_property
+ def proxies(self):
+ """Global proxy configuration"""
opts_proxy = self.params.get('proxy')
-
- cookie_processor = YoutubeDLCookieProcessor(self.cookiejar)
if opts_proxy is not None:
if opts_proxy == '':
- proxies = {}
- else:
- proxies = {'http': opts_proxy, 'https': opts_proxy}
+ opts_proxy = '__noproxy__'
+ proxies = {'all': opts_proxy}
else:
proxies = urllib.request.getproxies()
- # Set HTTPS proxy to HTTP one if given (https://github.com/ytdl-org/youtube-dl/issues/805)
+ # compat. Set HTTPS_PROXY to __noproxy__ to revert
if 'http' in proxies and 'https' not in proxies:
proxies['https'] = proxies['http']
- proxy_handler = PerRequestProxyHandler(proxies)
-
- debuglevel = 1 if self.params.get('debug_printtraffic') else 0
- https_handler = make_HTTPS_handler(self.params, debuglevel=debuglevel)
- ydlh = YoutubeDLHandler(self.params, debuglevel=debuglevel)
- redirect_handler = YoutubeDLRedirectHandler()
- data_handler = urllib.request.DataHandler()
-
- # When passing our own FileHandler instance, build_opener won't add the
- # default FileHandler and allows us to disable the file protocol, which
- # can be used for malicious purposes (see
- # https://github.com/ytdl-org/youtube-dl/issues/8227)
- file_handler = urllib.request.FileHandler()
-
- if not self.params.get('enable_file_urls'):
- def file_open(*args, **kwargs):
- raise urllib.error.URLError(
- 'file:// URLs are explicitly disabled in yt-dlp for security reasons. '
- 'Use --enable-file-urls to enable at your own risk.')
- file_handler.file_open = file_open
-
- opener = urllib.request.build_opener(
- proxy_handler, https_handler, cookie_processor, ydlh, redirect_handler, data_handler, file_handler)
-
- # Delete the default user-agent header, which would otherwise apply in
- # cases where our custom HTTP handler doesn't come into play
- # (See https://github.com/ytdl-org/youtube-dl/issues/1309 for details)
- opener.addheaders = []
- self._opener = opener
+
+ return proxies
@functools.cached_property
def cookiejar(self):
return load_cookies(
self.params.get('cookiefile'), self.params.get('cookiesfrombrowser'), self)
+ @property
+ def _opener(self):
+ """
+ Get a urllib OpenerDirector from the Urllib handler (deprecated).
+ """
+ self.deprecation_warning('YoutubeDL._opener() is deprecated, use YoutubeDL.urlopen()')
+ handler = self._request_director.handlers['Urllib']
+ return handler._get_instance(cookiejar=self.cookiejar, proxies=self.proxies)
+
def urlopen(self, req):
""" Start an HTTP download """
if isinstance(req, str):
- req = sanitized_Request(req)
- return self._opener.open(req, timeout=self._socket_timeout)
+ req = Request(req)
+ elif isinstance(req, urllib.request.Request):
+ self.deprecation_warning(
+ 'Passing a urllib.request.Request object to YoutubeDL.urlopen() is deprecated. '
+ 'Use yt_dlp.networking.common.Request instead.')
+ req = urllib_req_to_req(req)
+ assert isinstance(req, Request)
+
+ # compat: Assume user:pass url params are basic auth
+ url, basic_auth_header = extract_basic_auth(req.url)
+ if basic_auth_header:
+ req.headers['Authorization'] = basic_auth_header
+ req.url = sanitize_url(url)
+
+ clean_proxies(proxies=req.proxies, headers=req.headers)
+ clean_headers(req.headers)
+
+ try:
+ return self._request_director.send(req)
+ except NoSupportingHandlers as e:
+ for ue in e.unsupported_errors:
+ if not (ue.handler and ue.msg):
+ continue
+ if ue.handler.RH_KEY == 'Urllib' and 'unsupported url scheme: "file"' in ue.msg.lower():
+ raise RequestError(
+ 'file:// URLs are disabled by default in yt-dlp for security reasons. '
+ 'Use --enable-file-urls to enable at your own risk.', cause=ue) from ue
+ raise
+ except SSLError as e:
+ if 'UNSAFE_LEGACY_RENEGOTIATION_DISABLED' in str(e):
+ raise RequestError('UNSAFE_LEGACY_RENEGOTIATION_DISABLED: Try using --legacy-server-connect', cause=e) from e
+ elif 'SSLV3_ALERT_HANDSHAKE_FAILURE' in str(e):
+ raise RequestError(
+ 'SSLV3_ALERT_HANDSHAKE_FAILURE: The server may not support the current cipher list. '
+ 'Try using --legacy-server-connect', cause=e) from e
+ raise
+ except HTTPError as e: # TODO: Remove in a future release
+ raise _CompatHTTPError(e) from e
+
+ def build_request_director(self, handlers):
+ logger = _YDLLogger(self)
+ headers = self.params.get('http_headers').copy()
+ proxies = self.proxies.copy()
+ clean_headers(headers)
+ clean_proxies(proxies, headers)
+
+ director = RequestDirector(logger=logger, verbose=self.params.get('debug_printtraffic'))
+ for handler in handlers:
+ director.add_handler(handler(
+ logger=logger,
+ headers=headers,
+ cookiejar=self.cookiejar,
+ proxies=proxies,
+ prefer_system_certs='no-certifi' in self.params['compat_opts'],
+ verify=not self.params.get('nocheckcertificate'),
+ **traverse_obj(self.params, {
+ 'verbose': 'debug_printtraffic',
+ 'source_address': 'source_address',
+ 'timeout': 'socket_timeout',
+ 'legacy_ssl_support': 'legacyserverconnect',
+ 'enable_file_urls': 'enable_file_urls',
+ 'client_cert': {
+ 'client_certificate': 'client_certificate',
+ 'client_certificate_key': 'client_certificate_key',
+ 'client_certificate_password': 'client_certificate_password',
+ },
+ }),
+ ))
+ return director
def encode(self, s):
if isinstance(s, bytes):
else:
self.to_screen(f'[info] Downloading {thumb_display_id} ...')
try:
- uf = self.urlopen(sanitized_Request(t['url'], headers=t.get('http_headers', {})))
+ uf = self.urlopen(Request(t['url'], headers=t.get('http_headers', {})))
self.to_screen(f'[info] Writing {thumb_display_id} to: {thumb_filename}')
with open(encodeFilename(thumb_filename), 'wb') as thumbf:
shutil.copyfileobj(uf, thumbf)
ret.append((thumb_filename, thumb_filename_final))
t['filepath'] = thumb_filename
except network_exceptions as err:
- if isinstance(err, urllib.error.HTTPError) and err.code == 404:
+ if isinstance(err, HTTPError) and err.status == 404:
self.to_screen(f'[info] {thumb_display_id.title()} does not exist')
else:
self.report_warning(f'Unable to download {thumb_display_id}: {err}')