from __future__ import annotations
import contextlib
+import functools
import ssl
import sys
+import typing
import urllib.parse
+import urllib.request
+from .exceptions import RequestError, UnsupportedRequest
from ..dependencies import certifi
from ..socks import ProxyType
-from ..utils import YoutubeDLError
+from ..utils import format_field, traverse_obj
+
+if typing.TYPE_CHECKING:
+ from collections.abc import Iterable
+
+ from ..utils.networking import HTTPHeaderDict
def ssl_load_certs(context: ssl.SSLContext, use_certifi=True):
# enum_certificates is not present in mingw python. See https://github.com/yt-dlp/yt-dlp/issues/1151
if sys.platform == 'win32' and hasattr(ssl, 'enum_certificates'):
for storename in ('CA', 'ROOT'):
- _ssl_load_windows_store_certs(context, storename)
+ ssl_load_windows_store_certs(context, storename)
context.set_default_verify_paths()
-def _ssl_load_windows_store_certs(ssl_context, storename):
+def ssl_load_windows_store_certs(ssl_context, storename):
# Code adapted from _load_windows_store_certs in https://github.com/python/cpython/blob/main/Lib/ssl.py
try:
certs = [cert for cert, encoding, trust in ssl.enum_certificates(storename)
url_components = urllib.parse.urlparse(socks_proxy)
if url_components.scheme.lower() == 'socks5':
socks_type = ProxyType.SOCKS5
- elif url_components.scheme.lower() in ('socks', 'socks4'):
+ rdns = False
+ elif url_components.scheme.lower() == 'socks5h':
+ socks_type = ProxyType.SOCKS5
+ rdns = True
+ elif url_components.scheme.lower() == 'socks4':
socks_type = ProxyType.SOCKS4
+ rdns = False
elif url_components.scheme.lower() == 'socks4a':
socks_type = ProxyType.SOCKS4A
+ rdns = True
+ else:
+ raise ValueError(f'Unknown SOCKS proxy version: {url_components.scheme.lower()}')
def unquote_if_non_empty(s):
if not s:
'proxytype': socks_type,
'addr': url_components.hostname,
'port': url_components.port or 1080,
- 'rdns': True,
+ 'rdns': rdns,
'username': unquote_if_non_empty(url_components.username),
'password': unquote_if_non_empty(url_components.password),
}
+def select_proxy(url, proxies):
+ """Unified proxy selector for all backends"""
+ url_components = urllib.parse.urlparse(url)
+ if 'no' in proxies:
+ hostport = url_components.hostname + format_field(url_components.port, None, ':%s')
+ if urllib.request.proxy_bypass_environment(hostport, {'no': proxies['no']}):
+ return
+ elif urllib.request.proxy_bypass(hostport): # check system settings
+ return
+
+ return traverse_obj(proxies, url_components.scheme or 'http', 'all')
+
+
def get_redirect_method(method, status):
"""Unified redirect method handling"""
client_certificate, keyfile=client_certificate_key,
password=client_certificate_password)
except ssl.SSLError:
- raise YoutubeDLError('Unable to load client certificate')
+ raise RequestError('Unable to load client certificate')
+ if getattr(context, 'post_handshake_auth', None) is not None:
+ context.post_handshake_auth = True
return context
-def add_accept_encoding_header(headers, supported_encodings):
- if supported_encodings and 'Accept-Encoding' not in headers:
- headers['Accept-Encoding'] = ', '.join(supported_encodings)
+class InstanceStoreMixin:
+ def __init__(self, **kwargs):
+ self.__instances = []
+ super().__init__(**kwargs) # So that both MRO works
+
+ @staticmethod
+ def _create_instance(**kwargs):
+ raise NotImplementedError
- elif 'Accept-Encoding' not in headers:
- headers['Accept-Encoding'] = 'identity'
+ def _get_instance(self, **kwargs):
+ for key, instance in self.__instances:
+ if key == kwargs:
+ return instance
+
+ instance = self._create_instance(**kwargs)
+ self.__instances.append((kwargs, instance))
+ return instance
+
+ def _close_instance(self, instance):
+ if callable(getattr(instance, 'close', None)):
+ instance.close()
+
+ def _clear_instances(self):
+ for _, instance in self.__instances:
+ self._close_instance(instance)
+ self.__instances.clear()
+
+
+def add_accept_encoding_header(headers: HTTPHeaderDict, supported_encodings: Iterable[str]):
+ if 'Accept-Encoding' not in headers:
+ headers['Accept-Encoding'] = ', '.join(supported_encodings) or 'identity'
+
+
+def wrap_request_errors(func):
+ @functools.wraps(func)
+ def wrapper(self, *args, **kwargs):
+ try:
+ return func(self, *args, **kwargs)
+ except UnsupportedRequest as e:
+ if e.handler is None:
+ e.handler = self
+ raise
+ return wrapper