from email.message import Message
from http.cookiejar import CookieJar
+from test.conftest import validate_and_send
from test.helper import FakeYDL, http_server_port, verify_address_availability
from yt_dlp.cookies import YoutubeDLCookieJar
from yt_dlp.dependencies import brotli, requests, urllib3
TransportError,
UnsupportedRequest,
)
+from yt_dlp.networking.impersonate import (
+ ImpersonateRequestHandler,
+ ImpersonateTarget,
+)
+from yt_dlp.utils import YoutubeDLError
from yt_dlp.utils._utils import _YDLLogger as FakeLogger
from yt_dlp.utils.networking import HTTPHeaderDict
-from test.conftest import validate_and_send
-
TEST_DIR = os.path.dirname(os.path.abspath(__file__))
class FakeRH(RequestHandler):
+ def __init__(self, *args, **params):
+ self.params = params
+ super().__init__(*args, **params)
+
def _validate(self, request):
return
('', {'all': '__noproxy__'}),
(None, {'http': 'http://127.0.0.1:8081', 'https': 'http://127.0.0.1:8081'}) # env, set https
])
- def test_proxy(self, proxy, expected):
- old_http_proxy = os.environ.get('HTTP_PROXY')
- try:
- os.environ['HTTP_PROXY'] = 'http://127.0.0.1:8081' # ensure that provided proxies override env
- with FakeYDL({'proxy': proxy}) as ydl:
- assert ydl.proxies == expected
- finally:
- if old_http_proxy:
- os.environ['HTTP_PROXY'] = old_http_proxy
+ def test_proxy(self, proxy, expected, monkeypatch):
+ monkeypatch.setenv('HTTP_PROXY', 'http://127.0.0.1:8081')
+ with FakeYDL({'proxy': proxy}) as ydl:
+ assert ydl.proxies == expected
def test_compat_request(self):
with FakeRHYDL() as ydl:
with pytest.raises(SSLError, match='testerror'):
ydl.urlopen('ssl://testerror')
+ def test_unsupported_impersonate_target(self):
+ class FakeImpersonationRHYDL(FakeYDL):
+ def __init__(self, *args, **kwargs):
+ class HTTPRH(RequestHandler):
+ def _send(self, request: Request):
+ pass
+ _SUPPORTED_URL_SCHEMES = ('http',)
+ _SUPPORTED_PROXY_SCHEMES = None
+
+ super().__init__(*args, **kwargs)
+ self._request_director = self.build_request_director([HTTPRH])
+
+ with FakeImpersonationRHYDL() as ydl:
+ with pytest.raises(
+ RequestError,
+ match=r'Impersonate target "test" is not available'
+ ):
+ ydl.urlopen(Request('http://', extensions={'impersonate': ImpersonateTarget('test', None, None, None)}))
+
+ def test_unsupported_impersonate_extension(self):
+ class FakeHTTPRHYDL(FakeYDL):
+ def __init__(self, *args, **kwargs):
+ class IRH(ImpersonateRequestHandler):
+ def _send(self, request: Request):
+ pass
+
+ _SUPPORTED_URL_SCHEMES = ('http',)
+ _SUPPORTED_IMPERSONATE_TARGET_MAP = {ImpersonateTarget('abc',): 'test'}
+ _SUPPORTED_PROXY_SCHEMES = None
+
+ super().__init__(*args, **kwargs)
+ self._request_director = self.build_request_director([IRH])
+
+ with FakeHTTPRHYDL() as ydl:
+ with pytest.raises(
+ RequestError,
+ match=r'Impersonate target "test" is not available'
+ ):
+ ydl.urlopen(Request('http://', extensions={'impersonate': ImpersonateTarget('test', None, None, None)}))
+
+ def test_raise_impersonate_error(self):
+ with pytest.raises(
+ YoutubeDLError,
+ match=r'Impersonate target "test" is not available'
+ ):
+ FakeYDL({'impersonate': ImpersonateTarget('test', None, None, None)})
+
+ def test_pass_impersonate_param(self, monkeypatch):
+
+ class IRH(ImpersonateRequestHandler):
+ def _send(self, request: Request):
+ pass
+
+ _SUPPORTED_URL_SCHEMES = ('http',)
+ _SUPPORTED_IMPERSONATE_TARGET_MAP = {ImpersonateTarget('abc'): 'test'}
+
+ # Bypass the check on initialize
+ brh = FakeYDL.build_request_director
+ monkeypatch.setattr(FakeYDL, 'build_request_director', lambda cls, handlers, preferences=None: brh(cls, handlers=[IRH]))
+
+ with FakeYDL({
+ 'impersonate': ImpersonateTarget('abc', None, None, None)
+ }) as ydl:
+ rh = self.build_handler(ydl, IRH)
+ assert rh.impersonate == ImpersonateTarget('abc', None, None, None)
+
+ def test_get_impersonate_targets(self):
+ handlers = []
+ for target_client in ('abc', 'xyz', 'asd'):
+ class TestRH(ImpersonateRequestHandler):
+ def _send(self, request: Request):
+ pass
+ _SUPPORTED_URL_SCHEMES = ('http',)
+ _SUPPORTED_IMPERSONATE_TARGET_MAP = {ImpersonateTarget(target_client,): 'test'}
+ RH_KEY = target_client
+ RH_NAME = target_client
+ handlers.append(TestRH)
+
+ with FakeYDL() as ydl:
+ ydl._request_director = ydl.build_request_director(handlers)
+ assert set(ydl._get_available_impersonate_targets()) == {
+ (ImpersonateTarget('xyz'), 'xyz'),
+ (ImpersonateTarget('abc'), 'abc'),
+ (ImpersonateTarget('asd'), 'asd')
+ }
+ assert ydl._impersonate_target_available(ImpersonateTarget('abc'))
+ assert ydl._impersonate_target_available(ImpersonateTarget())
+ assert not ydl._impersonate_target_available(ImpersonateTarget('zxy'))
+
@pytest.mark.parametrize('proxy_key,proxy_url,expected', [
('http', '__noproxy__', None),
('no', '127.0.0.1,foo.bar', '127.0.0.1,foo.bar'),
('http', 'socks4://example.com', 'socks4://example.com'),
('unrelated', '/bad/proxy', '/bad/proxy'), # clean_proxies should ignore bad proxies
])
- def test_clean_proxy(self, proxy_key, proxy_url, expected):
+ def test_clean_proxy(self, proxy_key, proxy_url, expected, monkeypatch):
# proxies should be cleaned in urlopen()
with FakeRHYDL() as ydl:
req = ydl.urlopen(Request('test://', proxies={proxy_key: proxy_url})).request
assert req.proxies[proxy_key] == expected
# and should also be cleaned when building the handler
- env_key = f'{proxy_key.upper()}_PROXY'
- old_env_proxy = os.environ.get(env_key)
- try:
- os.environ[env_key] = proxy_url # ensure that provided proxies override env
- with FakeYDL() as ydl:
- rh = self.build_handler(ydl)
- assert rh.proxies[proxy_key] == expected
- finally:
- if old_env_proxy:
- os.environ[env_key] = old_env_proxy
+ monkeypatch.setenv(f'{proxy_key.upper()}_PROXY', proxy_url)
+ with FakeYDL() as ydl:
+ rh = self.build_handler(ydl)
+ assert rh.proxies[proxy_key] == expected
def test_clean_proxy_header(self):
with FakeRHYDL() as ydl:
assert res.geturl() == res.url
assert res.info() is res.headers
assert res.getheader('test') == res.get_header('test')
+
+
+class TestImpersonateTarget:
+ @pytest.mark.parametrize('target_str,expected', [
+ ('abc', ImpersonateTarget('abc', None, None, None)),
+ ('abc-120_esr', ImpersonateTarget('abc', '120_esr', None, None)),
+ ('abc-120:xyz', ImpersonateTarget('abc', '120', 'xyz', None)),
+ ('abc-120:xyz-5.6', ImpersonateTarget('abc', '120', 'xyz', '5.6')),
+ ('abc:xyz', ImpersonateTarget('abc', None, 'xyz', None)),
+ ('abc:', ImpersonateTarget('abc', None, None, None)),
+ ('abc-120:', ImpersonateTarget('abc', '120', None, None)),
+ (':xyz', ImpersonateTarget(None, None, 'xyz', None)),
+ (':xyz-6.5', ImpersonateTarget(None, None, 'xyz', '6.5')),
+ (':', ImpersonateTarget(None, None, None, None)),
+ ('', ImpersonateTarget(None, None, None, None)),
+ ])
+ def test_target_from_str(self, target_str, expected):
+ assert ImpersonateTarget.from_str(target_str) == expected
+
+ @pytest.mark.parametrize('target_str', [
+ '-120', ':-12.0', '-12:-12', '-:-',
+ '::', 'a-c-d:', 'a-c-d:e-f-g', 'a:b:'
+ ])
+ def test_target_from_invalid_str(self, target_str):
+ with pytest.raises(ValueError):
+ ImpersonateTarget.from_str(target_str)
+
+ @pytest.mark.parametrize('target,expected', [
+ (ImpersonateTarget('abc', None, None, None), 'abc'),
+ (ImpersonateTarget('abc', '120', None, None), 'abc-120'),
+ (ImpersonateTarget('abc', '120', 'xyz', None), 'abc-120:xyz'),
+ (ImpersonateTarget('abc', '120', 'xyz', '5'), 'abc-120:xyz-5'),
+ (ImpersonateTarget('abc', None, 'xyz', None), 'abc:xyz'),
+ (ImpersonateTarget('abc', '120', None, None), 'abc-120'),
+ (ImpersonateTarget('abc', '120', 'xyz', None), 'abc-120:xyz'),
+ (ImpersonateTarget('abc', None, 'xyz'), 'abc:xyz'),
+ (ImpersonateTarget(None, None, 'xyz', '6.5'), ':xyz-6.5'),
+ (ImpersonateTarget('abc', ), 'abc'),
+ (ImpersonateTarget(None, None, None, None), ''),
+ ])
+ def test_str(self, target, expected):
+ assert str(target) == expected
+
+ @pytest.mark.parametrize('args', [
+ ('abc', None, None, '5'),
+ ('abc', '120', None, '5'),
+ (None, '120', None, None),
+ (None, '120', None, '5'),
+ (None, None, None, '5'),
+ (None, '120', 'xyz', '5'),
+ ])
+ def test_invalid_impersonate_target(self, args):
+ with pytest.raises(ValueError):
+ ImpersonateTarget(*args)
+
+ @pytest.mark.parametrize('target1,target2,is_in,is_eq', [
+ (ImpersonateTarget('abc', None, None, None), ImpersonateTarget('abc', None, None, None), True, True),
+ (ImpersonateTarget('abc', None, None, None), ImpersonateTarget('abc', '120', None, None), True, False),
+ (ImpersonateTarget('abc', None, 'xyz', 'test'), ImpersonateTarget('abc', '120', 'xyz', None), True, False),
+ (ImpersonateTarget('abc', '121', 'xyz', 'test'), ImpersonateTarget('abc', '120', 'xyz', 'test'), False, False),
+ (ImpersonateTarget('abc'), ImpersonateTarget('abc', '120', 'xyz', 'test'), True, False),
+ (ImpersonateTarget('abc', '120', 'xyz', 'test'), ImpersonateTarget('abc'), True, False),
+ (ImpersonateTarget(), ImpersonateTarget('abc', '120', 'xyz'), True, False),
+ (ImpersonateTarget(), ImpersonateTarget(), True, True),
+ ])
+ def test_impersonate_target_in(self, target1, target2, is_in, is_eq):
+ assert (target1 in target2) is is_in
+ assert (target1 == target2) is is_eq
SSLError,
network_exceptions,
)
+from .networking.impersonate import ImpersonateRequestHandler
from .plugins import directories as plugin_directories
from .postprocessor import _PLUGIN_CLASSES as plugin_pps
from .postprocessor import (
SameFileError,
UnavailableVideoError,
UserNotLive,
+ YoutubeDLError,
age_restricted,
args_to_str,
bug_reports_message,
- "detect_or_warn": check whether we can do anything
about it, warn otherwise (default)
source_address: Client-side IP address to bind to.
+ impersonate: Client to impersonate for requests.
+ An ImpersonateTarget (from yt_dlp.networking.impersonate)
sleep_interval_requests: Number of seconds to sleep between requests
during extraction
sleep_interval: Number of seconds to sleep before each download when
for msg in self.params.get('_deprecation_warnings', []):
self.deprecated_feature(msg)
+ if impersonate_target := self.params.get('impersonate'):
+ if not self._impersonate_target_available(impersonate_target):
+ raise YoutubeDLError(
+ f'Impersonate target "{impersonate_target}" is not available. '
+ f'Use --list-impersonate-targets to see available targets. '
+ f'You may be missing dependencies required to support this target.')
+
if 'list-formats' in self.params['compat_opts']:
self.params['listformats_table'] = False
handler = self._request_director.handlers['Urllib']
return handler._get_instance(cookiejar=self.cookiejar, proxies=self.proxies)
+ def _get_available_impersonate_targets(self):
+ # todo(future): make available as public API
+ return [
+ (target, rh.RH_NAME)
+ for rh in self._request_director.handlers.values()
+ if isinstance(rh, ImpersonateRequestHandler)
+ for target in rh.supported_targets
+ ]
+
+ def _impersonate_target_available(self, target):
+ # todo(future): make available as public API
+ return any(
+ rh.is_supported_target(target)
+ for rh in self._request_director.handlers.values()
+ if isinstance(rh, ImpersonateRequestHandler))
+
def urlopen(self, req):
""" Start an HTTP download """
if isinstance(req, str):
raise RequestError(
'file:// URLs are disabled by default in yt-dlp for security reasons. '
'Use --enable-file-urls to enable at your own risk.', cause=ue) from ue
- if 'unsupported proxy type: "https"' in ue.msg.lower():
+ if (
+ 'unsupported proxy type: "https"' in ue.msg.lower()
+ and 'requests' not in self._request_director.handlers
+ and 'curl_cffi' not in self._request_director.handlers
+ ):
raise RequestError(
- 'To use an HTTPS proxy for this request, one of the following dependencies needs to be installed: requests')
+ 'To use an HTTPS proxy for this request, one of the following dependencies needs to be installed: requests, curl_cffi')
elif (
re.match(r'unsupported url scheme: "wss?"', ue.msg.lower())
'This request requires WebSocket support. '
'Ensure one of the following dependencies are installed: websockets',
cause=ue) from ue
+
+ elif re.match(r'unsupported (?:extensions: impersonate|impersonate target)', ue.msg.lower()):
+ raise RequestError(
+ f'Impersonate target "{req.extensions["impersonate"]}" is not available.'
+ f' See --list-impersonate-targets for available targets.'
+ f' This request requires browser impersonation, however you may be missing dependencies'
+ f' required to support this target.')
raise
except SSLError as e:
if 'UNSAFE_LEGACY_RENEGOTIATION_DISABLED' in str(e):
'timeout': 'socket_timeout',
'legacy_ssl_support': 'legacyserverconnect',
'enable_file_urls': 'enable_file_urls',
+ 'impersonate': 'impersonate',
'client_cert': {
'client_certificate': 'client_certificate',
'client_certificate_key': 'client_certificate_key',
--- /dev/null
+from __future__ import annotations
+
+import re
+from abc import ABC
+from dataclasses import dataclass
+from typing import Any
+
+from .common import RequestHandler, register_preference
+from .exceptions import UnsupportedRequest
+from ..compat.types import NoneType
+from ..utils import classproperty, join_nonempty
+from ..utils.networking import std_headers
+
+
+@dataclass(order=True, frozen=True)
+class ImpersonateTarget:
+ """
+ A target for browser impersonation.
+
+ Parameters:
+ @param client: the client to impersonate
+ @param version: the client version to impersonate
+ @param os: the client OS to impersonate
+ @param os_version: the client OS version to impersonate
+
+ Note: None is used to indicate to match any.
+
+ """
+ client: str | None = None
+ version: str | None = None
+ os: str | None = None
+ os_version: str | None = None
+
+ def __post_init__(self):
+ if self.version and not self.client:
+ raise ValueError('client is required if version is set')
+ if self.os_version and not self.os:
+ raise ValueError('os is required if os_version is set')
+
+ def __contains__(self, target: ImpersonateTarget):
+ if not isinstance(target, ImpersonateTarget):
+ return False
+ return (
+ (self.client is None or target.client is None or self.client == target.client)
+ and (self.version is None or target.version is None or self.version == target.version)
+ and (self.os is None or target.os is None or self.os == target.os)
+ and (self.os_version is None or target.os_version is None or self.os_version == target.os_version)
+ )
+
+ def __str__(self):
+ return f'{join_nonempty(self.client, self.version)}:{join_nonempty(self.os, self.os_version)}'.rstrip(':')
+
+ @classmethod
+ def from_str(cls, target: str):
+ mobj = re.fullmatch(r'(?:(?P<client>[^:-]+)(?:-(?P<version>[^:-]+))?)?(?::(?:(?P<os>[^:-]+)(?:-(?P<os_version>[^:-]+))?)?)?', target)
+ if not mobj:
+ raise ValueError(f'Invalid impersonate target "{target}"')
+ return cls(**mobj.groupdict())
+
+
+class ImpersonateRequestHandler(RequestHandler, ABC):
+ """
+ Base class for request handlers that support browser impersonation.
+
+ This provides a method for checking the validity of the impersonate extension,
+ which can be used in _check_extensions.
+
+ Impersonate targets consist of a client, version, os and os_ver.
+ See the ImpersonateTarget class for more details.
+
+ The following may be defined:
+ - `_SUPPORTED_IMPERSONATE_TARGET_MAP`: a dict mapping supported targets to custom object.
+ Any Request with an impersonate target not in this list will raise an UnsupportedRequest.
+ Set to None to disable this check.
+ Note: Entries are in order of preference
+
+ Parameters:
+ @param impersonate: the default impersonate target to use for requests.
+ Set to None to disable impersonation.
+ """
+ _SUPPORTED_IMPERSONATE_TARGET_MAP: dict[ImpersonateTarget, Any] = {}
+
+ def __init__(self, *, impersonate: ImpersonateTarget = None, **kwargs):
+ super().__init__(**kwargs)
+ self.impersonate = impersonate
+
+ def _check_impersonate_target(self, target: ImpersonateTarget):
+ assert isinstance(target, (ImpersonateTarget, NoneType))
+ if target is None or not self.supported_targets:
+ return
+ if not self.is_supported_target(target):
+ raise UnsupportedRequest(f'Unsupported impersonate target: {target}')
+
+ def _check_extensions(self, extensions):
+ super()._check_extensions(extensions)
+ if 'impersonate' in extensions:
+ self._check_impersonate_target(extensions.get('impersonate'))
+
+ def _validate(self, request):
+ super()._validate(request)
+ self._check_impersonate_target(self.impersonate)
+
+ def _resolve_target(self, target: ImpersonateTarget | None):
+ """Resolve a target to a supported target."""
+ if target is None:
+ return
+ for supported_target in self.supported_targets:
+ if target in supported_target:
+ if self.verbose:
+ self._logger.stdout(
+ f'{self.RH_NAME}: resolved impersonate target {target} to {supported_target}')
+ return supported_target
+
+ @classproperty
+ def supported_targets(self) -> tuple[ImpersonateTarget, ...]:
+ return tuple(self._SUPPORTED_IMPERSONATE_TARGET_MAP.keys())
+
+ def is_supported_target(self, target: ImpersonateTarget):
+ assert isinstance(target, ImpersonateTarget)
+ return self._resolve_target(target) is not None
+
+ def _get_request_target(self, request):
+ """Get the requested target for the request"""
+ return self._resolve_target(request.extensions.get('impersonate') or self.impersonate)
+
+ def _get_impersonate_headers(self, request):
+ headers = self._merge_headers(request.headers)
+ if self._get_request_target(request) is not None:
+ # remove all headers present in std_headers
+ # todo: change this to not depend on std_headers
+ for k, v in std_headers.items():
+ if headers.get(k) == v:
+ headers.pop(k)
+ return headers
+
+
+@register_preference(ImpersonateRequestHandler)
+def impersonate_preference(rh, request):
+ if request.extensions.get('impersonate') or rh.impersonate:
+ return 1000
+ return 0