]> jfr.im git - yt-dlp.git/blame - test/test_networking_utils.py
[ie/tiktok] Fix audio-only format extraction (#7712)
[yt-dlp.git] / test / test_networking_utils.py
CommitLineData
227bf1a3 1#!/usr/bin/env python3
2
3# Allow direct execution
4import os
5import sys
6
7import pytest
8
9sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
10
3d2623a8 11import contextlib
227bf1a3 12import io
13import platform
14import random
15import ssl
16import urllib.error
3d2623a8 17import warnings
227bf1a3 18
19from yt_dlp.cookies import YoutubeDLCookieJar
20from yt_dlp.dependencies import certifi
21from yt_dlp.networking import Response
22from yt_dlp.networking._helper import (
23 InstanceStoreMixin,
24 add_accept_encoding_header,
25 get_redirect_method,
26 make_socks_proxy_opts,
27 select_proxy,
28 ssl_load_certs,
29)
30from yt_dlp.networking.exceptions import (
31 HTTPError,
32 IncompleteRead,
33 _CompatHTTPError,
34)
35from yt_dlp.socks import ProxyType
36from yt_dlp.utils.networking import HTTPHeaderDict
37
38TEST_DIR = os.path.dirname(os.path.abspath(__file__))
39
40
41class TestNetworkingUtils:
42
43 def test_select_proxy(self):
44 proxies = {
45 'all': 'socks5://example.com',
46 'http': 'http://example.com:1080',
47 'no': 'bypass.example.com,yt-dl.org'
48 }
49
50 assert select_proxy('https://example.com', proxies) == proxies['all']
51 assert select_proxy('http://example.com', proxies) == proxies['http']
52 assert select_proxy('http://bypass.example.com', proxies) is None
53 assert select_proxy('https://yt-dl.org', proxies) is None
54
55 @pytest.mark.parametrize('socks_proxy,expected', [
56 ('socks5h://example.com', {
57 'proxytype': ProxyType.SOCKS5,
58 'addr': 'example.com',
59 'port': 1080,
60 'rdns': True,
61 'username': None,
62 'password': None
63 }),
64 ('socks5://user:@example.com:5555', {
65 'proxytype': ProxyType.SOCKS5,
66 'addr': 'example.com',
67 'port': 5555,
68 'rdns': False,
69 'username': 'user',
70 'password': ''
71 }),
72 ('socks4://u%40ser:pa%20ss@127.0.0.1:1080', {
73 'proxytype': ProxyType.SOCKS4,
74 'addr': '127.0.0.1',
75 'port': 1080,
76 'rdns': False,
77 'username': 'u@ser',
78 'password': 'pa ss'
79 }),
80 ('socks4a://:pa%20ss@127.0.0.1', {
81 'proxytype': ProxyType.SOCKS4A,
82 'addr': '127.0.0.1',
83 'port': 1080,
84 'rdns': True,
85 'username': '',
86 'password': 'pa ss'
87 })
88 ])
89 def test_make_socks_proxy_opts(self, socks_proxy, expected):
90 assert make_socks_proxy_opts(socks_proxy) == expected
91
92 def test_make_socks_proxy_unknown(self):
93 with pytest.raises(ValueError, match='Unknown SOCKS proxy version: socks'):
94 make_socks_proxy_opts('socks://127.0.0.1')
95
96 @pytest.mark.skipif(not certifi, reason='certifi is not installed')
97 def test_load_certifi(self):
98 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
99 context2 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
100 ssl_load_certs(context, use_certifi=True)
101 context2.load_verify_locations(cafile=certifi.where())
102 assert context.get_ca_certs() == context2.get_ca_certs()
103
104 # Test load normal certs
105 # XXX: could there be a case where system certs are the same as certifi?
106 context3 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
107 ssl_load_certs(context3, use_certifi=False)
108 assert context3.get_ca_certs() != context.get_ca_certs()
109
110 @pytest.mark.parametrize('method,status,expected', [
111 ('GET', 303, 'GET'),
112 ('HEAD', 303, 'HEAD'),
113 ('PUT', 303, 'GET'),
114 ('POST', 301, 'GET'),
115 ('HEAD', 301, 'HEAD'),
116 ('POST', 302, 'GET'),
117 ('HEAD', 302, 'HEAD'),
118 ('PUT', 302, 'PUT'),
119 ('POST', 308, 'POST'),
120 ('POST', 307, 'POST'),
121 ('HEAD', 308, 'HEAD'),
122 ('HEAD', 307, 'HEAD'),
123 ])
124 def test_get_redirect_method(self, method, status, expected):
125 assert get_redirect_method(method, status) == expected
126
127 @pytest.mark.parametrize('headers,supported_encodings,expected', [
128 ({'Accept-Encoding': 'br'}, ['gzip', 'br'], {'Accept-Encoding': 'br'}),
129 ({}, ['gzip', 'br'], {'Accept-Encoding': 'gzip, br'}),
130 ({'Content-type': 'application/json'}, [], {'Content-type': 'application/json', 'Accept-Encoding': 'identity'}),
131 ])
132 def test_add_accept_encoding_header(self, headers, supported_encodings, expected):
133 headers = HTTPHeaderDict(headers)
134 add_accept_encoding_header(headers, supported_encodings)
135 assert headers == HTTPHeaderDict(expected)
136
137
138class TestInstanceStoreMixin:
139
140 class FakeInstanceStoreMixin(InstanceStoreMixin):
141 def _create_instance(self, **kwargs):
142 return random.randint(0, 1000000)
143
144 def _close_instance(self, instance):
145 pass
146
147 def test_mixin(self):
148 mixin = self.FakeInstanceStoreMixin()
149 assert mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'d', 4}}) == mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'d', 4}})
150
151 assert mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'e', 4}}) != mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'d', 4}})
152
153 assert mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'d', 4}} != mixin._get_instance(d={'a': 1, 'b': 2, 'g': {'d', 4}}))
154
155 assert mixin._get_instance(d={'a': 1}, e=[1, 2, 3]) == mixin._get_instance(d={'a': 1}, e=[1, 2, 3])
156
157 assert mixin._get_instance(d={'a': 1}, e=[1, 2, 3]) != mixin._get_instance(d={'a': 1}, e=[1, 2, 3, 4])
158
159 cookiejar = YoutubeDLCookieJar()
160 assert mixin._get_instance(b=[1, 2], c=cookiejar) == mixin._get_instance(b=[1, 2], c=cookiejar)
161
162 assert mixin._get_instance(b=[1, 2], c=cookiejar) != mixin._get_instance(b=[1, 2], c=YoutubeDLCookieJar())
163
164 # Different order
165 assert mixin._get_instance(c=cookiejar, b=[1, 2]) == mixin._get_instance(b=[1, 2], c=cookiejar)
166
167 m = mixin._get_instance(t=1234)
168 assert mixin._get_instance(t=1234) == m
169 mixin._clear_instances()
170 assert mixin._get_instance(t=1234) != m
171
172
173class TestNetworkingExceptions:
174
175 @staticmethod
176 def create_response(status):
177 return Response(fp=io.BytesIO(b'test'), url='http://example.com', headers={'tesT': 'test'}, status=status)
178
179 @pytest.mark.parametrize('http_error_class', [HTTPError, lambda r: _CompatHTTPError(HTTPError(r))])
180 def test_http_error(self, http_error_class):
181
182 response = self.create_response(403)
183 error = http_error_class(response)
184
185 assert error.status == 403
186 assert str(error) == error.msg == 'HTTP Error 403: Forbidden'
187 assert error.reason == response.reason
188 assert error.response is response
189
190 data = error.response.read()
191 assert data == b'test'
192 assert repr(error) == '<HTTPError 403: Forbidden>'
193
194 @pytest.mark.parametrize('http_error_class', [HTTPError, lambda *args, **kwargs: _CompatHTTPError(HTTPError(*args, **kwargs))])
195 def test_redirect_http_error(self, http_error_class):
196 response = self.create_response(301)
197 error = http_error_class(response, redirect_loop=True)
198 assert str(error) == error.msg == 'HTTP Error 301: Moved Permanently (redirect loop detected)'
199 assert error.reason == 'Moved Permanently'
200
201 def test_compat_http_error(self):
202 response = self.create_response(403)
203 error = _CompatHTTPError(HTTPError(response))
204 assert isinstance(error, HTTPError)
205 assert isinstance(error, urllib.error.HTTPError)
206
3d2623a8 207 @contextlib.contextmanager
208 def raises_deprecation_warning():
209 with warnings.catch_warnings(record=True) as w:
210 warnings.simplefilter('always')
211 yield
212
213 if len(w) == 0:
214 pytest.fail('Did not raise DeprecationWarning')
215 if len(w) > 1:
216 pytest.fail(f'Raised multiple warnings: {w}')
217
218 if not issubclass(w[-1].category, DeprecationWarning):
219 pytest.fail(f'Expected DeprecationWarning, got {w[-1].category}')
220 w.clear()
221
222 with raises_deprecation_warning():
223 assert error.code == 403
224
225 with raises_deprecation_warning():
226 assert error.getcode() == 403
227
228 with raises_deprecation_warning():
229 assert error.hdrs is error.response.headers
230
231 with raises_deprecation_warning():
232 assert error.info() is error.response.headers
233
234 with raises_deprecation_warning():
235 assert error.headers is error.response.headers
236
237 with raises_deprecation_warning():
238 assert error.filename == error.response.url
239
240 with raises_deprecation_warning():
241 assert error.url == error.response.url
242
243 with raises_deprecation_warning():
244 assert error.geturl() == error.response.url
227bf1a3 245
246 # Passthrough file operations
3d2623a8 247 with raises_deprecation_warning():
248 assert error.read() == b'test'
249
250 with raises_deprecation_warning():
251 assert not error.closed
252
253 with raises_deprecation_warning():
254 # Technically Response operations are also passed through, which should not be used.
255 assert error.get_header('test') == 'test'
256
257 # Should not raise a warning
258 error.close()
227bf1a3 259
260 @pytest.mark.skipif(
261 platform.python_implementation() == 'PyPy', reason='garbage collector works differently in pypy')
262 def test_compat_http_error_autoclose(self):
263 # Compat HTTPError should not autoclose response
264 response = self.create_response(403)
265 _CompatHTTPError(HTTPError(response))
266 assert not response.closed
267
268 def test_incomplete_read_error(self):
269 error = IncompleteRead(b'test', 3, cause='test')
270 assert isinstance(error, IncompleteRead)
271 assert repr(error) == '<IncompleteRead: 4 bytes read, 3 more expected>'
272 assert str(error) == error.msg == '4 bytes read, 3 more expected'
273 assert error.partial == b'test'
274 assert error.expected == 3
275 assert error.cause == 'test'
276
277 error = IncompleteRead(b'aaa')
278 assert repr(error) == '<IncompleteRead: 3 bytes read>'
279 assert str(error) == '3 bytes read'