]> jfr.im git - yt-dlp.git/commitdiff
Add new options `--impersonate` and `--list-impersonate-targets`
authorcoletdjnz <redacted>
Sun, 17 Mar 2024 03:47:56 +0000 (22:47 -0500)
committerbashonly <redacted>
Sun, 17 Mar 2024 04:14:13 +0000 (23:14 -0500)
Authored by: coletdjnz, Grub4K, pukkandan, bashonly

Co-authored-by: Simon Sawicki <redacted>
Co-authored-by: pukkandan <redacted>
Co-authored-by: bashonly <redacted>
README.md
test/test_networking.py
yt_dlp/YoutubeDL.py
yt_dlp/__init__.py
yt_dlp/networking/impersonate.py [new file with mode: 0644]
yt_dlp/options.py

index 1e108a29c2250cb5f871a086d5fbfe6b0e2e2fc1..d4b89229fb898f7a8b40bb8954219db963546be0 100644 (file)
--- a/README.md
+++ b/README.md
@@ -389,6 +389,10 @@ If you fork the project on GitHub, you can run your fork's [build workflow](.git
                                     direct connection
     --socket-timeout SECONDS        Time to wait before giving up, in seconds
     --source-address IP             Client-side IP address to bind to
+    --impersonate CLIENT[:OS]       Client to impersonate for requests. E.g.
+                                    chrome, chrome-110, chrome:windows-10. Pass
+                                    --impersonate="" to impersonate any client.
+    --list-impersonate-targets      List available clients to impersonate.
     -4, --force-ipv4                Make all connections via IPv4
     -6, --force-ipv6                Make all connections via IPv6
     --enable-file-urls              Enable file:// URLs. This is disabled by
index 628f1f17111164a34a851aa685293063f685b82c..b67b521d957258cc7c51121f7389b4f2ce76e0bd 100644 (file)
@@ -27,6 +27,7 @@ import zlib
 from email.message import Message
 from http.cookiejar import CookieJar
 
+from test.conftest import validate_and_send
 from test.helper import FakeYDL, http_server_port, verify_address_availability
 from yt_dlp.cookies import YoutubeDLCookieJar
 from yt_dlp.dependencies import brotli, requests, urllib3
@@ -50,11 +51,14 @@ from yt_dlp.networking.exceptions import (
     TransportError,
     UnsupportedRequest,
 )
+from yt_dlp.networking.impersonate import (
+    ImpersonateRequestHandler,
+    ImpersonateTarget,
+)
+from yt_dlp.utils import YoutubeDLError
 from yt_dlp.utils._utils import _YDLLogger as FakeLogger
 from yt_dlp.utils.networking import HTTPHeaderDict
 
-from test.conftest import validate_and_send
-
 TEST_DIR = os.path.dirname(os.path.abspath(__file__))
 
 
@@ -1113,6 +1117,10 @@ class FakeResponse(Response):
 
 class FakeRH(RequestHandler):
 
+    def __init__(self, *args, **params):
+        self.params = params
+        super().__init__(*args, **params)
+
     def _validate(self, request):
         return
 
@@ -1271,15 +1279,10 @@ class TestYoutubeDLNetworking:
         ('', {'all': '__noproxy__'}),
         (None, {'http': 'http://127.0.0.1:8081', 'https': 'http://127.0.0.1:8081'})  # env, set https
     ])
-    def test_proxy(self, proxy, expected):
-        old_http_proxy = os.environ.get('HTTP_PROXY')
-        try:
-            os.environ['HTTP_PROXY'] = 'http://127.0.0.1:8081'  # ensure that provided proxies override env
-            with FakeYDL({'proxy': proxy}) as ydl:
-                assert ydl.proxies == expected
-        finally:
-            if old_http_proxy:
-                os.environ['HTTP_PROXY'] = old_http_proxy
+    def test_proxy(self, proxy, expected, monkeypatch):
+        monkeypatch.setenv('HTTP_PROXY', 'http://127.0.0.1:8081')
+        with FakeYDL({'proxy': proxy}) as ydl:
+            assert ydl.proxies == expected
 
     def test_compat_request(self):
         with FakeRHYDL() as ydl:
@@ -1331,6 +1334,95 @@ class TestYoutubeDLNetworking:
             with pytest.raises(SSLError, match='testerror'):
                 ydl.urlopen('ssl://testerror')
 
+    def test_unsupported_impersonate_target(self):
+        class FakeImpersonationRHYDL(FakeYDL):
+            def __init__(self, *args, **kwargs):
+                class HTTPRH(RequestHandler):
+                    def _send(self, request: Request):
+                        pass
+                    _SUPPORTED_URL_SCHEMES = ('http',)
+                    _SUPPORTED_PROXY_SCHEMES = None
+
+                super().__init__(*args, **kwargs)
+                self._request_director = self.build_request_director([HTTPRH])
+
+        with FakeImpersonationRHYDL() as ydl:
+            with pytest.raises(
+                RequestError,
+                match=r'Impersonate target "test" is not available'
+            ):
+                ydl.urlopen(Request('http://', extensions={'impersonate': ImpersonateTarget('test', None, None, None)}))
+
+    def test_unsupported_impersonate_extension(self):
+        class FakeHTTPRHYDL(FakeYDL):
+            def __init__(self, *args, **kwargs):
+                class IRH(ImpersonateRequestHandler):
+                    def _send(self, request: Request):
+                        pass
+
+                    _SUPPORTED_URL_SCHEMES = ('http',)
+                    _SUPPORTED_IMPERSONATE_TARGET_MAP = {ImpersonateTarget('abc',): 'test'}
+                    _SUPPORTED_PROXY_SCHEMES = None
+
+                super().__init__(*args, **kwargs)
+                self._request_director = self.build_request_director([IRH])
+
+        with FakeHTTPRHYDL() as ydl:
+            with pytest.raises(
+                RequestError,
+                match=r'Impersonate target "test" is not available'
+            ):
+                ydl.urlopen(Request('http://', extensions={'impersonate': ImpersonateTarget('test', None, None, None)}))
+
+    def test_raise_impersonate_error(self):
+        with pytest.raises(
+            YoutubeDLError,
+            match=r'Impersonate target "test" is not available'
+        ):
+            FakeYDL({'impersonate': ImpersonateTarget('test', None, None, None)})
+
+    def test_pass_impersonate_param(self, monkeypatch):
+
+        class IRH(ImpersonateRequestHandler):
+            def _send(self, request: Request):
+                pass
+
+            _SUPPORTED_URL_SCHEMES = ('http',)
+            _SUPPORTED_IMPERSONATE_TARGET_MAP = {ImpersonateTarget('abc'): 'test'}
+
+        # Bypass the check on initialize
+        brh = FakeYDL.build_request_director
+        monkeypatch.setattr(FakeYDL, 'build_request_director', lambda cls, handlers, preferences=None: brh(cls, handlers=[IRH]))
+
+        with FakeYDL({
+            'impersonate': ImpersonateTarget('abc', None, None, None)
+        }) as ydl:
+            rh = self.build_handler(ydl, IRH)
+            assert rh.impersonate == ImpersonateTarget('abc', None, None, None)
+
+    def test_get_impersonate_targets(self):
+        handlers = []
+        for target_client in ('abc', 'xyz', 'asd'):
+            class TestRH(ImpersonateRequestHandler):
+                def _send(self, request: Request):
+                    pass
+                _SUPPORTED_URL_SCHEMES = ('http',)
+                _SUPPORTED_IMPERSONATE_TARGET_MAP = {ImpersonateTarget(target_client,): 'test'}
+                RH_KEY = target_client
+                RH_NAME = target_client
+            handlers.append(TestRH)
+
+        with FakeYDL() as ydl:
+            ydl._request_director = ydl.build_request_director(handlers)
+            assert set(ydl._get_available_impersonate_targets()) == {
+                (ImpersonateTarget('xyz'), 'xyz'),
+                (ImpersonateTarget('abc'), 'abc'),
+                (ImpersonateTarget('asd'), 'asd')
+            }
+            assert ydl._impersonate_target_available(ImpersonateTarget('abc'))
+            assert ydl._impersonate_target_available(ImpersonateTarget())
+            assert not ydl._impersonate_target_available(ImpersonateTarget('zxy'))
+
     @pytest.mark.parametrize('proxy_key,proxy_url,expected', [
         ('http', '__noproxy__', None),
         ('no', '127.0.0.1,foo.bar', '127.0.0.1,foo.bar'),
@@ -1341,23 +1433,17 @@ class TestYoutubeDLNetworking:
         ('http', 'socks4://example.com', 'socks4://example.com'),
         ('unrelated', '/bad/proxy', '/bad/proxy'),  # clean_proxies should ignore bad proxies
     ])
-    def test_clean_proxy(self, proxy_key, proxy_url, expected):
+    def test_clean_proxy(self, proxy_key, proxy_url, expected, monkeypatch):
         # proxies should be cleaned in urlopen()
         with FakeRHYDL() as ydl:
             req = ydl.urlopen(Request('test://', proxies={proxy_key: proxy_url})).request
             assert req.proxies[proxy_key] == expected
 
         # and should also be cleaned when building the handler
-        env_key = f'{proxy_key.upper()}_PROXY'
-        old_env_proxy = os.environ.get(env_key)
-        try:
-            os.environ[env_key] = proxy_url  # ensure that provided proxies override env
-            with FakeYDL() as ydl:
-                rh = self.build_handler(ydl)
-                assert rh.proxies[proxy_key] == expected
-        finally:
-            if old_env_proxy:
-                os.environ[env_key] = old_env_proxy
+        monkeypatch.setenv(f'{proxy_key.upper()}_PROXY', proxy_url)
+        with FakeYDL() as ydl:
+            rh = self.build_handler(ydl)
+            assert rh.proxies[proxy_key] == expected
 
     def test_clean_proxy_header(self):
         with FakeRHYDL() as ydl:
@@ -1629,3 +1715,71 @@ class TestResponse:
             assert res.geturl() == res.url
             assert res.info() is res.headers
             assert res.getheader('test') == res.get_header('test')
+
+
+class TestImpersonateTarget:
+    @pytest.mark.parametrize('target_str,expected', [
+        ('abc', ImpersonateTarget('abc', None, None, None)),
+        ('abc-120_esr', ImpersonateTarget('abc', '120_esr', None, None)),
+        ('abc-120:xyz', ImpersonateTarget('abc', '120', 'xyz', None)),
+        ('abc-120:xyz-5.6', ImpersonateTarget('abc', '120', 'xyz', '5.6')),
+        ('abc:xyz', ImpersonateTarget('abc', None, 'xyz', None)),
+        ('abc:', ImpersonateTarget('abc', None, None, None)),
+        ('abc-120:', ImpersonateTarget('abc', '120', None, None)),
+        (':xyz', ImpersonateTarget(None, None, 'xyz', None)),
+        (':xyz-6.5', ImpersonateTarget(None, None, 'xyz', '6.5')),
+        (':', ImpersonateTarget(None, None, None, None)),
+        ('', ImpersonateTarget(None, None, None, None)),
+    ])
+    def test_target_from_str(self, target_str, expected):
+        assert ImpersonateTarget.from_str(target_str) == expected
+
+    @pytest.mark.parametrize('target_str', [
+        '-120', ':-12.0', '-12:-12', '-:-',
+        '::', 'a-c-d:', 'a-c-d:e-f-g', 'a:b:'
+    ])
+    def test_target_from_invalid_str(self, target_str):
+        with pytest.raises(ValueError):
+            ImpersonateTarget.from_str(target_str)
+
+    @pytest.mark.parametrize('target,expected', [
+        (ImpersonateTarget('abc', None, None, None), 'abc'),
+        (ImpersonateTarget('abc', '120', None, None), 'abc-120'),
+        (ImpersonateTarget('abc', '120', 'xyz', None), 'abc-120:xyz'),
+        (ImpersonateTarget('abc', '120', 'xyz', '5'), 'abc-120:xyz-5'),
+        (ImpersonateTarget('abc', None, 'xyz', None), 'abc:xyz'),
+        (ImpersonateTarget('abc', '120', None, None), 'abc-120'),
+        (ImpersonateTarget('abc', '120', 'xyz', None), 'abc-120:xyz'),
+        (ImpersonateTarget('abc', None, 'xyz'), 'abc:xyz'),
+        (ImpersonateTarget(None, None, 'xyz', '6.5'), ':xyz-6.5'),
+        (ImpersonateTarget('abc', ), 'abc'),
+        (ImpersonateTarget(None, None, None, None), ''),
+    ])
+    def test_str(self, target, expected):
+        assert str(target) == expected
+
+    @pytest.mark.parametrize('args', [
+        ('abc', None, None, '5'),
+        ('abc', '120', None, '5'),
+        (None, '120', None, None),
+        (None, '120', None, '5'),
+        (None, None, None, '5'),
+        (None, '120', 'xyz', '5'),
+    ])
+    def test_invalid_impersonate_target(self, args):
+        with pytest.raises(ValueError):
+            ImpersonateTarget(*args)
+
+    @pytest.mark.parametrize('target1,target2,is_in,is_eq', [
+        (ImpersonateTarget('abc', None, None, None), ImpersonateTarget('abc', None, None, None), True, True),
+        (ImpersonateTarget('abc', None, None, None), ImpersonateTarget('abc', '120', None, None), True, False),
+        (ImpersonateTarget('abc', None, 'xyz', 'test'), ImpersonateTarget('abc', '120', 'xyz', None), True, False),
+        (ImpersonateTarget('abc', '121', 'xyz', 'test'), ImpersonateTarget('abc', '120', 'xyz', 'test'), False, False),
+        (ImpersonateTarget('abc'), ImpersonateTarget('abc', '120', 'xyz', 'test'), True, False),
+        (ImpersonateTarget('abc', '120', 'xyz', 'test'), ImpersonateTarget('abc'), True, False),
+        (ImpersonateTarget(), ImpersonateTarget('abc', '120', 'xyz'), True, False),
+        (ImpersonateTarget(), ImpersonateTarget(), True, True),
+    ])
+    def test_impersonate_target_in(self, target1, target2, is_in, is_eq):
+        assert (target1 in target2) is is_in
+        assert (target1 == target2) is is_eq
index c34d97bba1b0a60221556124f419235633dc48ca..e3d1db3761e84c3dc5bd8b2bbe0af6b431f88911 100644 (file)
@@ -42,6 +42,7 @@ from .networking.exceptions import (
     SSLError,
     network_exceptions,
 )
+from .networking.impersonate import ImpersonateRequestHandler
 from .plugins import directories as plugin_directories
 from .postprocessor import _PLUGIN_CLASSES as plugin_pps
 from .postprocessor import (
@@ -99,6 +100,7 @@ from .utils import (
     SameFileError,
     UnavailableVideoError,
     UserNotLive,
+    YoutubeDLError,
     age_restricted,
     args_to_str,
     bug_reports_message,
@@ -402,6 +404,8 @@ class YoutubeDL:
                        - "detect_or_warn": check whether we can do anything
                                            about it, warn otherwise (default)
     source_address:    Client-side IP address to bind to.
+    impersonate:       Client to impersonate for requests.
+                       An ImpersonateTarget (from yt_dlp.networking.impersonate)
     sleep_interval_requests: Number of seconds to sleep between requests
                        during extraction
     sleep_interval:    Number of seconds to sleep before each download when
@@ -713,6 +717,13 @@ class YoutubeDL:
         for msg in self.params.get('_deprecation_warnings', []):
             self.deprecated_feature(msg)
 
+        if impersonate_target := self.params.get('impersonate'):
+            if not self._impersonate_target_available(impersonate_target):
+                raise YoutubeDLError(
+                    f'Impersonate target "{impersonate_target}" is not available. '
+                    f'Use --list-impersonate-targets to see available targets. '
+                    f'You may be missing dependencies required to support this target.')
+
         if 'list-formats' in self.params['compat_opts']:
             self.params['listformats_table'] = False
 
@@ -4077,6 +4088,22 @@ class YoutubeDL:
         handler = self._request_director.handlers['Urllib']
         return handler._get_instance(cookiejar=self.cookiejar, proxies=self.proxies)
 
+    def _get_available_impersonate_targets(self):
+        # todo(future): make available as public API
+        return [
+            (target, rh.RH_NAME)
+            for rh in self._request_director.handlers.values()
+            if isinstance(rh, ImpersonateRequestHandler)
+            for target in rh.supported_targets
+        ]
+
+    def _impersonate_target_available(self, target):
+        # todo(future): make available as public API
+        return any(
+            rh.is_supported_target(target)
+            for rh in self._request_director.handlers.values()
+            if isinstance(rh, ImpersonateRequestHandler))
+
     def urlopen(self, req):
         """ Start an HTTP download """
         if isinstance(req, str):
@@ -4108,9 +4135,13 @@ class YoutubeDL:
                     raise RequestError(
                         'file:// URLs are disabled by default in yt-dlp for security reasons. '
                         'Use --enable-file-urls to enable at your own risk.', cause=ue) from ue
-                if 'unsupported proxy type: "https"' in ue.msg.lower():
+                if (
+                    'unsupported proxy type: "https"' in ue.msg.lower()
+                    and 'requests' not in self._request_director.handlers
+                    and 'curl_cffi' not in self._request_director.handlers
+                ):
                     raise RequestError(
-                        'To use an HTTPS proxy for this request, one of the following dependencies needs to be installed: requests')
+                        'To use an HTTPS proxy for this request, one of the following dependencies needs to be installed: requests, curl_cffi')
 
                 elif (
                     re.match(r'unsupported url scheme: "wss?"', ue.msg.lower())
@@ -4120,6 +4151,13 @@ class YoutubeDL:
                         'This request requires WebSocket support. '
                         'Ensure one of the following dependencies are installed: websockets',
                         cause=ue) from ue
+
+                elif re.match(r'unsupported (?:extensions: impersonate|impersonate target)', ue.msg.lower()):
+                    raise RequestError(
+                        f'Impersonate target "{req.extensions["impersonate"]}" is not available.'
+                        f' See --list-impersonate-targets for available targets.'
+                        f' This request requires browser impersonation, however you may be missing dependencies'
+                        f' required to support this target.')
             raise
         except SSLError as e:
             if 'UNSAFE_LEGACY_RENEGOTIATION_DISABLED' in str(e):
@@ -4152,6 +4190,7 @@ class YoutubeDL:
                     'timeout': 'socket_timeout',
                     'legacy_ssl_support': 'legacyserverconnect',
                     'enable_file_urls': 'enable_file_urls',
+                    'impersonate': 'impersonate',
                     'client_cert': {
                         'client_certificate': 'client_certificate',
                         'client_certificate_key': 'client_certificate_key',
index aeea2625ef1d5b89f256eef3a7ab00525bc87ebc..940594fafb8b1afc74d68145a5f7aa5bb1cfd93b 100644 (file)
@@ -19,6 +19,7 @@ from .cookies import SUPPORTED_BROWSERS, SUPPORTED_KEYRINGS
 from .downloader.external import get_external_downloader
 from .extractor import list_extractor_classes
 from .extractor.adobepass import MSO_INFO
+from .networking.impersonate import ImpersonateTarget
 from .options import parseOpts
 from .postprocessor import (
     FFmpegExtractAudioPP,
@@ -48,6 +49,7 @@ from .utils import (
     float_or_none,
     format_field,
     int_or_none,
+    join_nonempty,
     match_filter_func,
     parse_bytes,
     parse_duration,
@@ -388,6 +390,9 @@ def validate_options(opts):
                                  f'Supported keyrings are: {", ".join(sorted(SUPPORTED_KEYRINGS))}')
         opts.cookiesfrombrowser = (browser_name, profile, keyring, container)
 
+    if opts.impersonate is not None:
+        opts.impersonate = ImpersonateTarget.from_str(opts.impersonate.lower())
+
     # MetadataParser
     def metadataparser_actions(f):
         if isinstance(f, str):
@@ -911,6 +916,7 @@ def parse_options(argv=None):
         'postprocessors': postprocessors,
         'fixup': opts.fixup,
         'source_address': opts.source_address,
+        'impersonate': opts.impersonate,
         'call_home': opts.call_home,
         'sleep_interval_requests': opts.sleep_interval_requests,
         'sleep_interval': opts.sleep_interval,
@@ -980,6 +986,41 @@ def _real_main(argv=None):
             traceback.print_exc()
             ydl._download_retcode = 100
 
+        if opts.list_impersonate_targets:
+
+            known_targets = [
+                # List of simplified targets we know are supported,
+                # to help users know what dependencies may be required.
+                (ImpersonateTarget('chrome'), 'curl_cffi'),
+                (ImpersonateTarget('edge'), 'curl_cffi'),
+                (ImpersonateTarget('safari'), 'curl_cffi'),
+            ]
+
+            available_targets = ydl._get_available_impersonate_targets()
+
+            def make_row(target, handler):
+                return [
+                    join_nonempty(target.client.title(), target.version, delim='-') or '-',
+                    join_nonempty((target.os or "").title(), target.os_version, delim='-') or '-',
+                    handler,
+                ]
+
+            rows = [make_row(target, handler) for target, handler in available_targets]
+
+            for known_target, known_handler in known_targets:
+                if not any(
+                    known_target in target and handler == known_handler
+                    for target, handler in available_targets
+                ):
+                    rows.append([
+                        ydl._format_out(text, ydl.Styles.SUPPRESS)
+                        for text in make_row(known_target, f'{known_handler} (not available)')
+                    ])
+
+            ydl.to_screen('[info] Available impersonate targets')
+            ydl.to_stdout(render_table(['Client', 'OS', 'Source'], rows, extra_gap=2, delim='-'))
+            return
+
         if not actual_use:
             if pre_process:
                 return ydl._download_retcode
diff --git a/yt_dlp/networking/impersonate.py b/yt_dlp/networking/impersonate.py
new file mode 100644 (file)
index 0000000..ca66180
--- /dev/null
@@ -0,0 +1,141 @@
+from __future__ import annotations
+
+import re
+from abc import ABC
+from dataclasses import dataclass
+from typing import Any
+
+from .common import RequestHandler, register_preference
+from .exceptions import UnsupportedRequest
+from ..compat.types import NoneType
+from ..utils import classproperty, join_nonempty
+from ..utils.networking import std_headers
+
+
+@dataclass(order=True, frozen=True)
+class ImpersonateTarget:
+    """
+    A target for browser impersonation.
+
+    Parameters:
+    @param client: the client to impersonate
+    @param version: the client version to impersonate
+    @param os: the client OS to impersonate
+    @param os_version: the client OS version to impersonate
+
+    Note: None is used to indicate to match any.
+
+    """
+    client: str | None = None
+    version: str | None = None
+    os: str | None = None
+    os_version: str | None = None
+
+    def __post_init__(self):
+        if self.version and not self.client:
+            raise ValueError('client is required if version is set')
+        if self.os_version and not self.os:
+            raise ValueError('os is required if os_version is set')
+
+    def __contains__(self, target: ImpersonateTarget):
+        if not isinstance(target, ImpersonateTarget):
+            return False
+        return (
+            (self.client is None or target.client is None or self.client == target.client)
+            and (self.version is None or target.version is None or self.version == target.version)
+            and (self.os is None or target.os is None or self.os == target.os)
+            and (self.os_version is None or target.os_version is None or self.os_version == target.os_version)
+        )
+
+    def __str__(self):
+        return f'{join_nonempty(self.client, self.version)}:{join_nonempty(self.os, self.os_version)}'.rstrip(':')
+
+    @classmethod
+    def from_str(cls, target: str):
+        mobj = re.fullmatch(r'(?:(?P<client>[^:-]+)(?:-(?P<version>[^:-]+))?)?(?::(?:(?P<os>[^:-]+)(?:-(?P<os_version>[^:-]+))?)?)?', target)
+        if not mobj:
+            raise ValueError(f'Invalid impersonate target "{target}"')
+        return cls(**mobj.groupdict())
+
+
+class ImpersonateRequestHandler(RequestHandler, ABC):
+    """
+    Base class for request handlers that support browser impersonation.
+
+    This provides a method for checking the validity of the impersonate extension,
+    which can be used in _check_extensions.
+
+    Impersonate targets consist of a client, version, os and os_ver.
+    See the ImpersonateTarget class for more details.
+
+    The following may be defined:
+     - `_SUPPORTED_IMPERSONATE_TARGET_MAP`: a dict mapping supported targets to custom object.
+                Any Request with an impersonate target not in this list will raise an UnsupportedRequest.
+                Set to None to disable this check.
+                Note: Entries are in order of preference
+
+    Parameters:
+    @param impersonate: the default impersonate target to use for requests.
+                        Set to None to disable impersonation.
+    """
+    _SUPPORTED_IMPERSONATE_TARGET_MAP: dict[ImpersonateTarget, Any] = {}
+
+    def __init__(self, *, impersonate: ImpersonateTarget = None, **kwargs):
+        super().__init__(**kwargs)
+        self.impersonate = impersonate
+
+    def _check_impersonate_target(self, target: ImpersonateTarget):
+        assert isinstance(target, (ImpersonateTarget, NoneType))
+        if target is None or not self.supported_targets:
+            return
+        if not self.is_supported_target(target):
+            raise UnsupportedRequest(f'Unsupported impersonate target: {target}')
+
+    def _check_extensions(self, extensions):
+        super()._check_extensions(extensions)
+        if 'impersonate' in extensions:
+            self._check_impersonate_target(extensions.get('impersonate'))
+
+    def _validate(self, request):
+        super()._validate(request)
+        self._check_impersonate_target(self.impersonate)
+
+    def _resolve_target(self, target: ImpersonateTarget | None):
+        """Resolve a target to a supported target."""
+        if target is None:
+            return
+        for supported_target in self.supported_targets:
+            if target in supported_target:
+                if self.verbose:
+                    self._logger.stdout(
+                        f'{self.RH_NAME}: resolved impersonate target {target} to {supported_target}')
+                return supported_target
+
+    @classproperty
+    def supported_targets(self) -> tuple[ImpersonateTarget, ...]:
+        return tuple(self._SUPPORTED_IMPERSONATE_TARGET_MAP.keys())
+
+    def is_supported_target(self, target: ImpersonateTarget):
+        assert isinstance(target, ImpersonateTarget)
+        return self._resolve_target(target) is not None
+
+    def _get_request_target(self, request):
+        """Get the requested target for the request"""
+        return self._resolve_target(request.extensions.get('impersonate') or self.impersonate)
+
+    def _get_impersonate_headers(self, request):
+        headers = self._merge_headers(request.headers)
+        if self._get_request_target(request) is not None:
+            # remove all headers present in std_headers
+            # todo: change this to not depend on std_headers
+            for k, v in std_headers.items():
+                if headers.get(k) == v:
+                    headers.pop(k)
+        return headers
+
+
+@register_preference(ImpersonateRequestHandler)
+def impersonate_preference(rh, request):
+    if request.extensions.get('impersonate') or rh.impersonate:
+        return 1000
+    return 0
index f8847273128ae70d2a852dd7f1dc5f8927e35238..dac56dc1f0fa10fffb0003dd34120b3579608817 100644 (file)
@@ -515,6 +515,18 @@ def create_parser():
         metavar='IP', dest='source_address', default=None,
         help='Client-side IP address to bind to',
     )
+    network.add_option(
+        '--impersonate',
+        metavar='CLIENT[:OS]', dest='impersonate', default=None,
+        help=(
+            'Client to impersonate for requests. E.g. chrome, chrome-110, chrome:windows-10. '
+            'Pass --impersonate="" to impersonate any client.'),
+    )
+    network.add_option(
+        '--list-impersonate-targets',
+        dest='list_impersonate_targets', default=False, action='store_true',
+        help='List available clients to impersonate.',
+    )
     network.add_option(
         '-4', '--force-ipv4',
         action='store_const', const='0.0.0.0', dest='source_address',