]> jfr.im git - yt-dlp.git/blame - test/test_networking_utils.py
[cleanup] Misc (#8182)
[yt-dlp.git] / test / test_networking_utils.py
CommitLineData
227bf1a3 1#!/usr/bin/env python3
2
3# Allow direct execution
4import os
5import sys
6
7import pytest
8
9sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
10
3d2623a8 11import contextlib
227bf1a3 12import io
13import platform
14import random
15import ssl
16import urllib.error
3d2623a8 17import warnings
227bf1a3 18
19from yt_dlp.cookies import YoutubeDLCookieJar
20from yt_dlp.dependencies import certifi
21from yt_dlp.networking import Response
22from yt_dlp.networking._helper import (
23 InstanceStoreMixin,
24 add_accept_encoding_header,
25 get_redirect_method,
26 make_socks_proxy_opts,
27 select_proxy,
28 ssl_load_certs,
29)
30from yt_dlp.networking.exceptions import (
31 HTTPError,
32 IncompleteRead,
33 _CompatHTTPError,
34)
35from yt_dlp.socks import ProxyType
36from yt_dlp.utils.networking import HTTPHeaderDict
37
38TEST_DIR = os.path.dirname(os.path.abspath(__file__))
39
40
41class TestNetworkingUtils:
42
43 def test_select_proxy(self):
44 proxies = {
45 'all': 'socks5://example.com',
46 'http': 'http://example.com:1080',
47 'no': 'bypass.example.com,yt-dl.org'
48 }
49
50 assert select_proxy('https://example.com', proxies) == proxies['all']
51 assert select_proxy('http://example.com', proxies) == proxies['http']
52 assert select_proxy('http://bypass.example.com', proxies) is None
53 assert select_proxy('https://yt-dl.org', proxies) is None
54
55 @pytest.mark.parametrize('socks_proxy,expected', [
56 ('socks5h://example.com', {
57 'proxytype': ProxyType.SOCKS5,
58 'addr': 'example.com',
59 'port': 1080,
60 'rdns': True,
61 'username': None,
62 'password': None
63 }),
64 ('socks5://user:@example.com:5555', {
65 'proxytype': ProxyType.SOCKS5,
66 'addr': 'example.com',
67 'port': 5555,
68 'rdns': False,
69 'username': 'user',
70 'password': ''
71 }),
72 ('socks4://u%40ser:pa%20ss@127.0.0.1:1080', {
73 'proxytype': ProxyType.SOCKS4,
74 'addr': '127.0.0.1',
75 'port': 1080,
76 'rdns': False,
77 'username': 'u@ser',
78 'password': 'pa ss'
79 }),
80 ('socks4a://:pa%20ss@127.0.0.1', {
81 'proxytype': ProxyType.SOCKS4A,
82 'addr': '127.0.0.1',
83 'port': 1080,
84 'rdns': True,
85 'username': '',
86 'password': 'pa ss'
87 })
88 ])
89 def test_make_socks_proxy_opts(self, socks_proxy, expected):
90 assert make_socks_proxy_opts(socks_proxy) == expected
91
92 def test_make_socks_proxy_unknown(self):
93 with pytest.raises(ValueError, match='Unknown SOCKS proxy version: socks'):
94 make_socks_proxy_opts('socks://127.0.0.1')
95
96 @pytest.mark.skipif(not certifi, reason='certifi is not installed')
97 def test_load_certifi(self):
de20687e 98 context_certifi = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
99 context_certifi.load_verify_locations(cafile=certifi.where())
227bf1a3 100 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
227bf1a3 101 ssl_load_certs(context, use_certifi=True)
de20687e 102 assert context.get_ca_certs() == context_certifi.get_ca_certs()
103
104 context_default = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
105 context_default.load_default_certs()
106 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
107 ssl_load_certs(context, use_certifi=False)
108 assert context.get_ca_certs() == context_default.get_ca_certs()
109
110 if context_default.get_ca_certs() == context_certifi.get_ca_certs():
111 pytest.skip('System uses certifi as default. The test is not valid')
227bf1a3 112
113 @pytest.mark.parametrize('method,status,expected', [
114 ('GET', 303, 'GET'),
115 ('HEAD', 303, 'HEAD'),
116 ('PUT', 303, 'GET'),
117 ('POST', 301, 'GET'),
118 ('HEAD', 301, 'HEAD'),
119 ('POST', 302, 'GET'),
120 ('HEAD', 302, 'HEAD'),
121 ('PUT', 302, 'PUT'),
122 ('POST', 308, 'POST'),
123 ('POST', 307, 'POST'),
124 ('HEAD', 308, 'HEAD'),
125 ('HEAD', 307, 'HEAD'),
126 ])
127 def test_get_redirect_method(self, method, status, expected):
128 assert get_redirect_method(method, status) == expected
129
130 @pytest.mark.parametrize('headers,supported_encodings,expected', [
131 ({'Accept-Encoding': 'br'}, ['gzip', 'br'], {'Accept-Encoding': 'br'}),
132 ({}, ['gzip', 'br'], {'Accept-Encoding': 'gzip, br'}),
133 ({'Content-type': 'application/json'}, [], {'Content-type': 'application/json', 'Accept-Encoding': 'identity'}),
134 ])
135 def test_add_accept_encoding_header(self, headers, supported_encodings, expected):
136 headers = HTTPHeaderDict(headers)
137 add_accept_encoding_header(headers, supported_encodings)
138 assert headers == HTTPHeaderDict(expected)
139
140
141class TestInstanceStoreMixin:
142
143 class FakeInstanceStoreMixin(InstanceStoreMixin):
144 def _create_instance(self, **kwargs):
145 return random.randint(0, 1000000)
146
147 def _close_instance(self, instance):
148 pass
149
150 def test_mixin(self):
151 mixin = self.FakeInstanceStoreMixin()
152 assert mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'d', 4}}) == mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'d', 4}})
153
154 assert mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'e', 4}}) != mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'d', 4}})
155
156 assert mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'d', 4}} != mixin._get_instance(d={'a': 1, 'b': 2, 'g': {'d', 4}}))
157
158 assert mixin._get_instance(d={'a': 1}, e=[1, 2, 3]) == mixin._get_instance(d={'a': 1}, e=[1, 2, 3])
159
160 assert mixin._get_instance(d={'a': 1}, e=[1, 2, 3]) != mixin._get_instance(d={'a': 1}, e=[1, 2, 3, 4])
161
162 cookiejar = YoutubeDLCookieJar()
163 assert mixin._get_instance(b=[1, 2], c=cookiejar) == mixin._get_instance(b=[1, 2], c=cookiejar)
164
165 assert mixin._get_instance(b=[1, 2], c=cookiejar) != mixin._get_instance(b=[1, 2], c=YoutubeDLCookieJar())
166
167 # Different order
168 assert mixin._get_instance(c=cookiejar, b=[1, 2]) == mixin._get_instance(b=[1, 2], c=cookiejar)
169
170 m = mixin._get_instance(t=1234)
171 assert mixin._get_instance(t=1234) == m
172 mixin._clear_instances()
173 assert mixin._get_instance(t=1234) != m
174
175
176class TestNetworkingExceptions:
177
178 @staticmethod
179 def create_response(status):
180 return Response(fp=io.BytesIO(b'test'), url='http://example.com', headers={'tesT': 'test'}, status=status)
181
182 @pytest.mark.parametrize('http_error_class', [HTTPError, lambda r: _CompatHTTPError(HTTPError(r))])
183 def test_http_error(self, http_error_class):
184
185 response = self.create_response(403)
186 error = http_error_class(response)
187
188 assert error.status == 403
189 assert str(error) == error.msg == 'HTTP Error 403: Forbidden'
190 assert error.reason == response.reason
191 assert error.response is response
192
193 data = error.response.read()
194 assert data == b'test'
195 assert repr(error) == '<HTTPError 403: Forbidden>'
196
197 @pytest.mark.parametrize('http_error_class', [HTTPError, lambda *args, **kwargs: _CompatHTTPError(HTTPError(*args, **kwargs))])
198 def test_redirect_http_error(self, http_error_class):
199 response = self.create_response(301)
200 error = http_error_class(response, redirect_loop=True)
201 assert str(error) == error.msg == 'HTTP Error 301: Moved Permanently (redirect loop detected)'
202 assert error.reason == 'Moved Permanently'
203
204 def test_compat_http_error(self):
205 response = self.create_response(403)
206 error = _CompatHTTPError(HTTPError(response))
207 assert isinstance(error, HTTPError)
208 assert isinstance(error, urllib.error.HTTPError)
209
3d2623a8 210 @contextlib.contextmanager
211 def raises_deprecation_warning():
212 with warnings.catch_warnings(record=True) as w:
213 warnings.simplefilter('always')
214 yield
215
216 if len(w) == 0:
217 pytest.fail('Did not raise DeprecationWarning')
218 if len(w) > 1:
219 pytest.fail(f'Raised multiple warnings: {w}')
220
221 if not issubclass(w[-1].category, DeprecationWarning):
222 pytest.fail(f'Expected DeprecationWarning, got {w[-1].category}')
223 w.clear()
224
225 with raises_deprecation_warning():
226 assert error.code == 403
227
228 with raises_deprecation_warning():
229 assert error.getcode() == 403
230
231 with raises_deprecation_warning():
232 assert error.hdrs is error.response.headers
233
234 with raises_deprecation_warning():
235 assert error.info() is error.response.headers
236
237 with raises_deprecation_warning():
238 assert error.headers is error.response.headers
239
240 with raises_deprecation_warning():
241 assert error.filename == error.response.url
242
243 with raises_deprecation_warning():
244 assert error.url == error.response.url
245
246 with raises_deprecation_warning():
247 assert error.geturl() == error.response.url
227bf1a3 248
249 # Passthrough file operations
3d2623a8 250 with raises_deprecation_warning():
251 assert error.read() == b'test'
252
253 with raises_deprecation_warning():
254 assert not error.closed
255
256 with raises_deprecation_warning():
257 # Technically Response operations are also passed through, which should not be used.
258 assert error.get_header('test') == 'test'
259
260 # Should not raise a warning
261 error.close()
227bf1a3 262
263 @pytest.mark.skipif(
264 platform.python_implementation() == 'PyPy', reason='garbage collector works differently in pypy')
265 def test_compat_http_error_autoclose(self):
266 # Compat HTTPError should not autoclose response
267 response = self.create_response(403)
268 _CompatHTTPError(HTTPError(response))
269 assert not response.closed
270
271 def test_incomplete_read_error(self):
5ca095cb 272 error = IncompleteRead(4, 3, cause='test')
227bf1a3 273 assert isinstance(error, IncompleteRead)
274 assert repr(error) == '<IncompleteRead: 4 bytes read, 3 more expected>'
275 assert str(error) == error.msg == '4 bytes read, 3 more expected'
5ca095cb 276 assert error.partial == 4
227bf1a3 277 assert error.expected == 3
278 assert error.cause == 'test'
279
5ca095cb 280 error = IncompleteRead(3)
227bf1a3 281 assert repr(error) == '<IncompleteRead: 3 bytes read>'
282 assert str(error) == '3 bytes read'