]>
Commit | Line | Data |
---|---|---|
227bf1a3 | 1 | #!/usr/bin/env python3 |
2 | ||
3 | # Allow direct execution | |
4 | import os | |
5 | import sys | |
6 | ||
7 | import pytest | |
8 | ||
9 | sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
10 | ||
3d2623a8 | 11 | import contextlib |
227bf1a3 | 12 | import io |
13 | import platform | |
14 | import random | |
15 | import ssl | |
16 | import urllib.error | |
3d2623a8 | 17 | import warnings |
227bf1a3 | 18 | |
19 | from yt_dlp.cookies import YoutubeDLCookieJar | |
20 | from yt_dlp.dependencies import certifi | |
21 | from yt_dlp.networking import Response | |
22 | from yt_dlp.networking._helper import ( | |
23 | InstanceStoreMixin, | |
24 | add_accept_encoding_header, | |
25 | get_redirect_method, | |
26 | make_socks_proxy_opts, | |
27 | select_proxy, | |
28 | ssl_load_certs, | |
29 | ) | |
30 | from yt_dlp.networking.exceptions import ( | |
31 | HTTPError, | |
32 | IncompleteRead, | |
33 | _CompatHTTPError, | |
34 | ) | |
35 | from yt_dlp.socks import ProxyType | |
36 | from yt_dlp.utils.networking import HTTPHeaderDict | |
37 | ||
38 | TEST_DIR = os.path.dirname(os.path.abspath(__file__)) | |
39 | ||
40 | ||
41 | class TestNetworkingUtils: | |
42 | ||
43 | def test_select_proxy(self): | |
44 | proxies = { | |
45 | 'all': 'socks5://example.com', | |
46 | 'http': 'http://example.com:1080', | |
47 | 'no': 'bypass.example.com,yt-dl.org' | |
48 | } | |
49 | ||
50 | assert select_proxy('https://example.com', proxies) == proxies['all'] | |
51 | assert select_proxy('http://example.com', proxies) == proxies['http'] | |
52 | assert select_proxy('http://bypass.example.com', proxies) is None | |
53 | assert select_proxy('https://yt-dl.org', proxies) is None | |
54 | ||
55 | @pytest.mark.parametrize('socks_proxy,expected', [ | |
56 | ('socks5h://example.com', { | |
57 | 'proxytype': ProxyType.SOCKS5, | |
58 | 'addr': 'example.com', | |
59 | 'port': 1080, | |
60 | 'rdns': True, | |
61 | 'username': None, | |
62 | 'password': None | |
63 | }), | |
64 | ('socks5://user:@example.com:5555', { | |
65 | 'proxytype': ProxyType.SOCKS5, | |
66 | 'addr': 'example.com', | |
67 | 'port': 5555, | |
68 | 'rdns': False, | |
69 | 'username': 'user', | |
70 | 'password': '' | |
71 | }), | |
72 | ('socks4://u%40ser:pa%20ss@127.0.0.1:1080', { | |
73 | 'proxytype': ProxyType.SOCKS4, | |
74 | 'addr': '127.0.0.1', | |
75 | 'port': 1080, | |
76 | 'rdns': False, | |
77 | 'username': 'u@ser', | |
78 | 'password': 'pa ss' | |
79 | }), | |
80 | ('socks4a://:pa%20ss@127.0.0.1', { | |
81 | 'proxytype': ProxyType.SOCKS4A, | |
82 | 'addr': '127.0.0.1', | |
83 | 'port': 1080, | |
84 | 'rdns': True, | |
85 | 'username': '', | |
86 | 'password': 'pa ss' | |
87 | }) | |
88 | ]) | |
89 | def test_make_socks_proxy_opts(self, socks_proxy, expected): | |
90 | assert make_socks_proxy_opts(socks_proxy) == expected | |
91 | ||
92 | def test_make_socks_proxy_unknown(self): | |
93 | with pytest.raises(ValueError, match='Unknown SOCKS proxy version: socks'): | |
94 | make_socks_proxy_opts('socks://127.0.0.1') | |
95 | ||
96 | @pytest.mark.skipif(not certifi, reason='certifi is not installed') | |
97 | def test_load_certifi(self): | |
de20687e | 98 | context_certifi = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) |
99 | context_certifi.load_verify_locations(cafile=certifi.where()) | |
227bf1a3 | 100 | context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) |
227bf1a3 | 101 | ssl_load_certs(context, use_certifi=True) |
de20687e | 102 | assert context.get_ca_certs() == context_certifi.get_ca_certs() |
103 | ||
104 | context_default = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) | |
105 | context_default.load_default_certs() | |
106 | context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) | |
107 | ssl_load_certs(context, use_certifi=False) | |
108 | assert context.get_ca_certs() == context_default.get_ca_certs() | |
109 | ||
110 | if context_default.get_ca_certs() == context_certifi.get_ca_certs(): | |
111 | pytest.skip('System uses certifi as default. The test is not valid') | |
227bf1a3 | 112 | |
113 | @pytest.mark.parametrize('method,status,expected', [ | |
114 | ('GET', 303, 'GET'), | |
115 | ('HEAD', 303, 'HEAD'), | |
116 | ('PUT', 303, 'GET'), | |
117 | ('POST', 301, 'GET'), | |
118 | ('HEAD', 301, 'HEAD'), | |
119 | ('POST', 302, 'GET'), | |
120 | ('HEAD', 302, 'HEAD'), | |
121 | ('PUT', 302, 'PUT'), | |
122 | ('POST', 308, 'POST'), | |
123 | ('POST', 307, 'POST'), | |
124 | ('HEAD', 308, 'HEAD'), | |
125 | ('HEAD', 307, 'HEAD'), | |
126 | ]) | |
127 | def test_get_redirect_method(self, method, status, expected): | |
128 | assert get_redirect_method(method, status) == expected | |
129 | ||
130 | @pytest.mark.parametrize('headers,supported_encodings,expected', [ | |
131 | ({'Accept-Encoding': 'br'}, ['gzip', 'br'], {'Accept-Encoding': 'br'}), | |
132 | ({}, ['gzip', 'br'], {'Accept-Encoding': 'gzip, br'}), | |
133 | ({'Content-type': 'application/json'}, [], {'Content-type': 'application/json', 'Accept-Encoding': 'identity'}), | |
134 | ]) | |
135 | def test_add_accept_encoding_header(self, headers, supported_encodings, expected): | |
136 | headers = HTTPHeaderDict(headers) | |
137 | add_accept_encoding_header(headers, supported_encodings) | |
138 | assert headers == HTTPHeaderDict(expected) | |
139 | ||
140 | ||
141 | class TestInstanceStoreMixin: | |
142 | ||
143 | class FakeInstanceStoreMixin(InstanceStoreMixin): | |
144 | def _create_instance(self, **kwargs): | |
145 | return random.randint(0, 1000000) | |
146 | ||
147 | def _close_instance(self, instance): | |
148 | pass | |
149 | ||
150 | def test_mixin(self): | |
151 | mixin = self.FakeInstanceStoreMixin() | |
152 | assert mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'d', 4}}) == mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'d', 4}}) | |
153 | ||
154 | assert mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'e', 4}}) != mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'d', 4}}) | |
155 | ||
156 | assert mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'d', 4}} != mixin._get_instance(d={'a': 1, 'b': 2, 'g': {'d', 4}})) | |
157 | ||
158 | assert mixin._get_instance(d={'a': 1}, e=[1, 2, 3]) == mixin._get_instance(d={'a': 1}, e=[1, 2, 3]) | |
159 | ||
160 | assert mixin._get_instance(d={'a': 1}, e=[1, 2, 3]) != mixin._get_instance(d={'a': 1}, e=[1, 2, 3, 4]) | |
161 | ||
162 | cookiejar = YoutubeDLCookieJar() | |
163 | assert mixin._get_instance(b=[1, 2], c=cookiejar) == mixin._get_instance(b=[1, 2], c=cookiejar) | |
164 | ||
165 | assert mixin._get_instance(b=[1, 2], c=cookiejar) != mixin._get_instance(b=[1, 2], c=YoutubeDLCookieJar()) | |
166 | ||
167 | # Different order | |
168 | assert mixin._get_instance(c=cookiejar, b=[1, 2]) == mixin._get_instance(b=[1, 2], c=cookiejar) | |
169 | ||
170 | m = mixin._get_instance(t=1234) | |
171 | assert mixin._get_instance(t=1234) == m | |
172 | mixin._clear_instances() | |
173 | assert mixin._get_instance(t=1234) != m | |
174 | ||
175 | ||
176 | class TestNetworkingExceptions: | |
177 | ||
178 | @staticmethod | |
179 | def create_response(status): | |
180 | return Response(fp=io.BytesIO(b'test'), url='http://example.com', headers={'tesT': 'test'}, status=status) | |
181 | ||
182 | @pytest.mark.parametrize('http_error_class', [HTTPError, lambda r: _CompatHTTPError(HTTPError(r))]) | |
183 | def test_http_error(self, http_error_class): | |
184 | ||
185 | response = self.create_response(403) | |
186 | error = http_error_class(response) | |
187 | ||
188 | assert error.status == 403 | |
189 | assert str(error) == error.msg == 'HTTP Error 403: Forbidden' | |
190 | assert error.reason == response.reason | |
191 | assert error.response is response | |
192 | ||
193 | data = error.response.read() | |
194 | assert data == b'test' | |
195 | assert repr(error) == '<HTTPError 403: Forbidden>' | |
196 | ||
197 | @pytest.mark.parametrize('http_error_class', [HTTPError, lambda *args, **kwargs: _CompatHTTPError(HTTPError(*args, **kwargs))]) | |
198 | def test_redirect_http_error(self, http_error_class): | |
199 | response = self.create_response(301) | |
200 | error = http_error_class(response, redirect_loop=True) | |
201 | assert str(error) == error.msg == 'HTTP Error 301: Moved Permanently (redirect loop detected)' | |
202 | assert error.reason == 'Moved Permanently' | |
203 | ||
204 | def test_compat_http_error(self): | |
205 | response = self.create_response(403) | |
206 | error = _CompatHTTPError(HTTPError(response)) | |
207 | assert isinstance(error, HTTPError) | |
208 | assert isinstance(error, urllib.error.HTTPError) | |
209 | ||
3d2623a8 | 210 | @contextlib.contextmanager |
211 | def raises_deprecation_warning(): | |
212 | with warnings.catch_warnings(record=True) as w: | |
213 | warnings.simplefilter('always') | |
214 | yield | |
215 | ||
216 | if len(w) == 0: | |
217 | pytest.fail('Did not raise DeprecationWarning') | |
218 | if len(w) > 1: | |
219 | pytest.fail(f'Raised multiple warnings: {w}') | |
220 | ||
221 | if not issubclass(w[-1].category, DeprecationWarning): | |
222 | pytest.fail(f'Expected DeprecationWarning, got {w[-1].category}') | |
223 | w.clear() | |
224 | ||
225 | with raises_deprecation_warning(): | |
226 | assert error.code == 403 | |
227 | ||
228 | with raises_deprecation_warning(): | |
229 | assert error.getcode() == 403 | |
230 | ||
231 | with raises_deprecation_warning(): | |
232 | assert error.hdrs is error.response.headers | |
233 | ||
234 | with raises_deprecation_warning(): | |
235 | assert error.info() is error.response.headers | |
236 | ||
237 | with raises_deprecation_warning(): | |
238 | assert error.headers is error.response.headers | |
239 | ||
240 | with raises_deprecation_warning(): | |
241 | assert error.filename == error.response.url | |
242 | ||
243 | with raises_deprecation_warning(): | |
244 | assert error.url == error.response.url | |
245 | ||
246 | with raises_deprecation_warning(): | |
247 | assert error.geturl() == error.response.url | |
227bf1a3 | 248 | |
249 | # Passthrough file operations | |
3d2623a8 | 250 | with raises_deprecation_warning(): |
251 | assert error.read() == b'test' | |
252 | ||
253 | with raises_deprecation_warning(): | |
254 | assert not error.closed | |
255 | ||
256 | with raises_deprecation_warning(): | |
257 | # Technically Response operations are also passed through, which should not be used. | |
258 | assert error.get_header('test') == 'test' | |
259 | ||
260 | # Should not raise a warning | |
261 | error.close() | |
227bf1a3 | 262 | |
263 | @pytest.mark.skipif( | |
264 | platform.python_implementation() == 'PyPy', reason='garbage collector works differently in pypy') | |
265 | def test_compat_http_error_autoclose(self): | |
266 | # Compat HTTPError should not autoclose response | |
267 | response = self.create_response(403) | |
268 | _CompatHTTPError(HTTPError(response)) | |
269 | assert not response.closed | |
270 | ||
271 | def test_incomplete_read_error(self): | |
272 | error = IncompleteRead(b'test', 3, cause='test') | |
273 | assert isinstance(error, IncompleteRead) | |
274 | assert repr(error) == '<IncompleteRead: 4 bytes read, 3 more expected>' | |
275 | assert str(error) == error.msg == '4 bytes read, 3 more expected' | |
276 | assert error.partial == b'test' | |
277 | assert error.expected == 3 | |
278 | assert error.cause == 'test' | |
279 | ||
280 | error = IncompleteRead(b'aaa') | |
281 | assert repr(error) == '<IncompleteRead: 3 bytes read>' | |
282 | assert str(error) == '3 bytes read' |