X-Git-Url: https://jfr.im/git/yt-dlp.git/blobdiff_plain/c365dba8430ee33abda85d31f95128605bf240eb..227bf1a33be7b89cd7d44ad046844c4ccba104f4:/yt_dlp/networking/_helper.py diff --git a/yt_dlp/networking/_helper.py b/yt_dlp/networking/_helper.py index 367f3f444..a43c57bb4 100644 --- a/yt_dlp/networking/_helper.py +++ b/yt_dlp/networking/_helper.py @@ -1,13 +1,22 @@ from __future__ import annotations import contextlib +import functools import ssl import sys +import typing import urllib.parse +import urllib.request +from .exceptions import RequestError, UnsupportedRequest from ..dependencies import certifi from ..socks import ProxyType -from ..utils import YoutubeDLError +from ..utils import format_field, traverse_obj + +if typing.TYPE_CHECKING: + from collections.abc import Iterable + + from ..utils.networking import HTTPHeaderDict def ssl_load_certs(context: ssl.SSLContext, use_certifi=True): @@ -23,11 +32,11 @@ def ssl_load_certs(context: ssl.SSLContext, use_certifi=True): # enum_certificates is not present in mingw python. See https://github.com/yt-dlp/yt-dlp/issues/1151 if sys.platform == 'win32' and hasattr(ssl, 'enum_certificates'): for storename in ('CA', 'ROOT'): - _ssl_load_windows_store_certs(context, storename) + ssl_load_windows_store_certs(context, storename) context.set_default_verify_paths() -def _ssl_load_windows_store_certs(ssl_context, storename): +def ssl_load_windows_store_certs(ssl_context, storename): # Code adapted from _load_windows_store_certs in https://github.com/python/cpython/blob/main/Lib/ssl.py try: certs = [cert for cert, encoding, trust in ssl.enum_certificates(storename) @@ -44,10 +53,18 @@ def make_socks_proxy_opts(socks_proxy): url_components = urllib.parse.urlparse(socks_proxy) if url_components.scheme.lower() == 'socks5': socks_type = ProxyType.SOCKS5 - elif url_components.scheme.lower() in ('socks', 'socks4'): + rdns = False + elif url_components.scheme.lower() == 'socks5h': + socks_type = ProxyType.SOCKS5 + rdns = True + elif url_components.scheme.lower() == 'socks4': socks_type = ProxyType.SOCKS4 + rdns = False elif url_components.scheme.lower() == 'socks4a': socks_type = ProxyType.SOCKS4A + rdns = True + else: + raise ValueError(f'Unknown SOCKS proxy version: {url_components.scheme.lower()}') def unquote_if_non_empty(s): if not s: @@ -57,12 +74,25 @@ def unquote_if_non_empty(s): 'proxytype': socks_type, 'addr': url_components.hostname, 'port': url_components.port or 1080, - 'rdns': True, + 'rdns': rdns, 'username': unquote_if_non_empty(url_components.username), 'password': unquote_if_non_empty(url_components.password), } +def select_proxy(url, proxies): + """Unified proxy selector for all backends""" + url_components = urllib.parse.urlparse(url) + if 'no' in proxies: + hostport = url_components.hostname + format_field(url_components.port, None, ':%s') + if urllib.request.proxy_bypass_environment(hostport, {'no': proxies['no']}): + return + elif urllib.request.proxy_bypass(hostport): # check system settings + return + + return traverse_obj(proxies, url_components.scheme or 'http', 'all') + + def get_redirect_method(method, status): """Unified redirect method handling""" @@ -126,14 +156,53 @@ def make_ssl_context( client_certificate, keyfile=client_certificate_key, password=client_certificate_password) except ssl.SSLError: - raise YoutubeDLError('Unable to load client certificate') + raise RequestError('Unable to load client certificate') + if getattr(context, 'post_handshake_auth', None) is not None: + context.post_handshake_auth = True return context -def add_accept_encoding_header(headers, supported_encodings): - if supported_encodings and 'Accept-Encoding' not in headers: - headers['Accept-Encoding'] = ', '.join(supported_encodings) +class InstanceStoreMixin: + def __init__(self, **kwargs): + self.__instances = [] + super().__init__(**kwargs) # So that both MRO works + + @staticmethod + def _create_instance(**kwargs): + raise NotImplementedError - elif 'Accept-Encoding' not in headers: - headers['Accept-Encoding'] = 'identity' + def _get_instance(self, **kwargs): + for key, instance in self.__instances: + if key == kwargs: + return instance + + instance = self._create_instance(**kwargs) + self.__instances.append((kwargs, instance)) + return instance + + def _close_instance(self, instance): + if callable(getattr(instance, 'close', None)): + instance.close() + + def _clear_instances(self): + for _, instance in self.__instances: + self._close_instance(instance) + self.__instances.clear() + + +def add_accept_encoding_header(headers: HTTPHeaderDict, supported_encodings: Iterable[str]): + if 'Accept-Encoding' not in headers: + headers['Accept-Encoding'] = ', '.join(supported_encodings) or 'identity' + + +def wrap_request_errors(func): + @functools.wraps(func) + def wrapper(self, *args, **kwargs): + try: + return func(self, *args, **kwargs) + except UnsupportedRequest as e: + if e.handler is None: + e.handler = self + raise + return wrapper