# Allow direct execution
import os
import sys
+import time
import pytest
from test.helper import verify_address_availability
+from yt_dlp.networking.common import Features, DEFAULT_TIMEOUT
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import ssl
import threading
-from yt_dlp import socks
+from yt_dlp import socks, traverse_obj
from yt_dlp.cookies import YoutubeDLCookieJar
from yt_dlp.dependencies import websockets
from yt_dlp.networking import Request
)
from yt_dlp.utils.networking import HTTPHeaderDict
-from test.conftest import validate_and_send
-
TEST_DIR = os.path.dirname(os.path.abspath(__file__))
def create_websocket_server(**ws_kwargs):
import websockets.sync.server
- wsd = websockets.sync.server.serve(websocket_handler, '127.0.0.1', 0, process_request=process_request, **ws_kwargs)
+ wsd = websockets.sync.server.serve(
+ websocket_handler, '127.0.0.1', 0,
+ process_request=process_request, open_timeout=2, **ws_kwargs)
ws_port = wsd.socket.getsockname()[1]
ws_server_thread = threading.Thread(target=wsd.serve_forever)
ws_server_thread.daemon = True
return create_websocket_server(ssl_context=sslctx)
+def ws_validate_and_send(rh, req):
+ rh.validate(req)
+ max_tries = 3
+ for i in range(max_tries):
+ try:
+ return rh.send(req)
+ except TransportError as e:
+ if i < (max_tries - 1) and 'connection closed during handshake' in str(e):
+ # websockets server sometimes hangs on new connections
+ continue
+ raise
+
+
@pytest.mark.skipif(not websockets, reason='websockets must be installed to test websocket request handlers')
+@pytest.mark.parametrize('handler', ['Websockets'], indirect=True)
class TestWebsSocketRequestHandlerConformance:
@classmethod
def setup_class(cls):
cls.mtls_wss_thread, cls.mtls_wss_port = create_mtls_wss_websocket_server()
cls.mtls_wss_base_url = f'wss://127.0.0.1:{cls.mtls_wss_port}'
- @pytest.mark.parametrize('handler', ['Websockets'], indirect=True)
def test_basic_websockets(self, handler):
with handler() as rh:
- ws = validate_and_send(rh, Request(self.ws_base_url))
+ ws = ws_validate_and_send(rh, Request(self.ws_base_url))
assert 'upgrade' in ws.headers
assert ws.status == 101
ws.send('foo')
# https://www.rfc-editor.org/rfc/rfc6455.html#section-5.6
@pytest.mark.parametrize('msg,opcode', [('str', 1), (b'bytes', 2)])
- @pytest.mark.parametrize('handler', ['Websockets'], indirect=True)
def test_send_types(self, handler, msg, opcode):
with handler() as rh:
- ws = validate_and_send(rh, Request(self.ws_base_url))
+ ws = ws_validate_and_send(rh, Request(self.ws_base_url))
ws.send(msg)
assert int(ws.recv()) == opcode
ws.close()
- @pytest.mark.parametrize('handler', ['Websockets'], indirect=True)
def test_verify_cert(self, handler):
with handler() as rh:
with pytest.raises(CertificateVerifyError):
- validate_and_send(rh, Request(self.wss_base_url))
+ ws_validate_and_send(rh, Request(self.wss_base_url))
with handler(verify=False) as rh:
- ws = validate_and_send(rh, Request(self.wss_base_url))
+ ws = ws_validate_and_send(rh, Request(self.wss_base_url))
assert ws.status == 101
ws.close()
- @pytest.mark.parametrize('handler', ['Websockets'], indirect=True)
def test_ssl_error(self, handler):
with handler(verify=False) as rh:
with pytest.raises(SSLError, match=r'ssl(?:v3|/tls) alert handshake failure') as exc_info:
- validate_and_send(rh, Request(self.bad_wss_host))
+ ws_validate_and_send(rh, Request(self.bad_wss_host))
assert not issubclass(exc_info.type, CertificateVerifyError)
- @pytest.mark.parametrize('handler', ['Websockets'], indirect=True)
@pytest.mark.parametrize('path,expected', [
# Unicode characters should be encoded with uppercase percent-encoding
('/中文', '/%E4%B8%AD%E6%96%87'),
])
def test_percent_encode(self, handler, path, expected):
with handler() as rh:
- ws = validate_and_send(rh, Request(f'{self.ws_base_url}{path}'))
+ ws = ws_validate_and_send(rh, Request(f'{self.ws_base_url}{path}'))
ws.send('path')
assert ws.recv() == expected
assert ws.status == 101
ws.close()
- @pytest.mark.parametrize('handler', ['Websockets'], indirect=True)
def test_remove_dot_segments(self, handler):
with handler() as rh:
# This isn't a comprehensive test,
# but it should be enough to check whether the handler is removing dot segments
- ws = validate_and_send(rh, Request(f'{self.ws_base_url}/a/b/./../../test'))
+ ws = ws_validate_and_send(rh, Request(f'{self.ws_base_url}/a/b/./../../test'))
assert ws.status == 101
ws.send('path')
assert ws.recv() == '/test'
# We are restricted to known HTTP status codes in http.HTTPStatus
# Redirects are not supported for websockets
- @pytest.mark.parametrize('handler', ['Websockets'], indirect=True)
@pytest.mark.parametrize('status', (200, 204, 301, 302, 303, 400, 500, 511))
def test_raise_http_error(self, handler, status):
with handler() as rh:
with pytest.raises(HTTPError) as exc_info:
- validate_and_send(rh, Request(f'{self.ws_base_url}/gen_{status}'))
+ ws_validate_and_send(rh, Request(f'{self.ws_base_url}/gen_{status}'))
assert exc_info.value.status == status
- @pytest.mark.parametrize('handler', ['Websockets'], indirect=True)
@pytest.mark.parametrize('params,extensions', [
({'timeout': sys.float_info.min}, {}),
({}, {'timeout': sys.float_info.min}),
])
- def test_timeout(self, handler, params, extensions):
+ def test_read_timeout(self, handler, params, extensions):
with handler(**params) as rh:
with pytest.raises(TransportError):
- validate_and_send(rh, Request(self.ws_base_url, extensions=extensions))
+ ws_validate_and_send(rh, Request(self.ws_base_url, extensions=extensions))
+
+ def test_connect_timeout(self, handler):
+ # nothing should be listening on this port
+ connect_timeout_url = 'ws://10.255.255.255'
+ with handler(timeout=0.01) as rh, pytest.raises(TransportError):
+ now = time.time()
+ ws_validate_and_send(rh, Request(connect_timeout_url))
+ assert time.time() - now < DEFAULT_TIMEOUT
+
+ # Per request timeout, should override handler timeout
+ request = Request(connect_timeout_url, extensions={'timeout': 0.01})
+ with handler() as rh, pytest.raises(TransportError):
+ now = time.time()
+ ws_validate_and_send(rh, request)
+ assert time.time() - now < DEFAULT_TIMEOUT
- @pytest.mark.parametrize('handler', ['Websockets'], indirect=True)
def test_cookies(self, handler):
cookiejar = YoutubeDLCookieJar()
cookiejar.set_cookie(http.cookiejar.Cookie(
comment_url=None, rest={}))
with handler(cookiejar=cookiejar) as rh:
- ws = validate_and_send(rh, Request(self.ws_base_url))
+ ws = ws_validate_and_send(rh, Request(self.ws_base_url))
ws.send('headers')
assert json.loads(ws.recv())['cookie'] == 'test=ytdlp'
ws.close()
with handler() as rh:
- ws = validate_and_send(rh, Request(self.ws_base_url))
+ ws = ws_validate_and_send(rh, Request(self.ws_base_url))
ws.send('headers')
assert 'cookie' not in json.loads(ws.recv())
ws.close()
- ws = validate_and_send(rh, Request(self.ws_base_url, extensions={'cookiejar': cookiejar}))
+ ws = ws_validate_and_send(rh, Request(self.ws_base_url, extensions={'cookiejar': cookiejar}))
ws.send('headers')
assert json.loads(ws.recv())['cookie'] == 'test=ytdlp'
ws.close()
- @pytest.mark.parametrize('handler', ['Websockets'], indirect=True)
def test_source_address(self, handler):
source_address = f'127.0.0.{random.randint(5, 255)}'
verify_address_availability(source_address)
with handler(source_address=source_address) as rh:
- ws = validate_and_send(rh, Request(self.ws_base_url))
+ ws = ws_validate_and_send(rh, Request(self.ws_base_url))
ws.send('source_address')
assert source_address == ws.recv()
ws.close()
- @pytest.mark.parametrize('handler', ['Websockets'], indirect=True)
def test_response_url(self, handler):
with handler() as rh:
url = f'{self.ws_base_url}/something'
- ws = validate_and_send(rh, Request(url))
+ ws = ws_validate_and_send(rh, Request(url))
assert ws.url == url
ws.close()
- @pytest.mark.parametrize('handler', ['Websockets'], indirect=True)
def test_request_headers(self, handler):
with handler(headers=HTTPHeaderDict({'test1': 'test', 'test2': 'test2'})) as rh:
# Global Headers
- ws = validate_and_send(rh, Request(self.ws_base_url))
+ ws = ws_validate_and_send(rh, Request(self.ws_base_url))
ws.send('headers')
headers = HTTPHeaderDict(json.loads(ws.recv()))
assert headers['test1'] == 'test'
ws.close()
# Per request headers, merged with global
- ws = validate_and_send(rh, Request(
+ ws = ws_validate_and_send(rh, Request(
self.ws_base_url, headers={'test2': 'changed', 'test3': 'test3'}))
ws.send('headers')
headers = HTTPHeaderDict(json.loads(ws.recv()))
'client_certificate': os.path.join(MTLS_CERT_DIR, 'client.crt'),
'client_certificate_key': os.path.join(MTLS_CERT_DIR, 'clientencrypted.key'),
'client_certificate_password': 'foobar',
- }
+ },
))
- @pytest.mark.parametrize('handler', ['Websockets'], indirect=True)
def test_mtls(self, handler, client_cert):
with handler(
# Disable client-side validation of unacceptable self-signed testcert.pem
# The test is of a check on the server side, so unaffected
verify=False,
- client_cert=client_cert
+ client_cert=client_cert,
) as rh:
- validate_and_send(rh, Request(self.mtls_wss_base_url)).close()
+ ws_validate_and_send(rh, Request(self.mtls_wss_base_url)).close()
+
+ def test_request_disable_proxy(self, handler):
+ for proxy_proto in handler._SUPPORTED_PROXY_SCHEMES or ['ws']:
+ # Given handler is configured with a proxy
+ with handler(proxies={'ws': f'{proxy_proto}://10.255.255.255'}, timeout=5) as rh:
+ # When a proxy is explicitly set to None for the request
+ ws = ws_validate_and_send(rh, Request(self.ws_base_url, proxies={'http': None}))
+ # Then no proxy should be used
+ assert ws.status == 101
+ ws.close()
+
+ @pytest.mark.skip_handlers_if(
+ lambda _, handler: Features.NO_PROXY not in handler._SUPPORTED_FEATURES, 'handler does not support NO_PROXY')
+ def test_noproxy(self, handler):
+ for proxy_proto in handler._SUPPORTED_PROXY_SCHEMES or ['ws']:
+ # Given the handler is configured with a proxy
+ with handler(proxies={'ws': f'{proxy_proto}://10.255.255.255'}, timeout=5) as rh:
+ for no_proxy in (f'127.0.0.1:{self.ws_port}', '127.0.0.1', 'localhost'):
+ # When request no proxy includes the request url host
+ ws = ws_validate_and_send(rh, Request(self.ws_base_url, proxies={'no': no_proxy}))
+ # Then the proxy should not be used
+ assert ws.status == 101
+ ws.close()
+
+ @pytest.mark.skip_handlers_if(
+ lambda _, handler: Features.ALL_PROXY not in handler._SUPPORTED_FEATURES, 'handler does not support ALL_PROXY')
+ def test_allproxy(self, handler):
+ supported_proto = traverse_obj(handler._SUPPORTED_PROXY_SCHEMES, 0, default='ws')
+ # This is a bit of a hacky test, but it should be enough to check whether the handler is using the proxy.
+ # 0.1s might not be enough of a timeout if proxy is not used in all cases, but should still get failures.
+ with handler(proxies={'all': f'{supported_proto}://10.255.255.255'}, timeout=0.1) as rh:
+ with pytest.raises(TransportError):
+ ws_validate_and_send(rh, Request(self.ws_base_url)).close()
+
+ with handler(timeout=0.1) as rh:
+ with pytest.raises(TransportError):
+ ws_validate_and_send(
+ rh, Request(self.ws_base_url, proxies={'all': f'{supported_proto}://10.255.255.255'})).close()
def create_fake_ws_connection(raised):