]>
Commit | Line | Data |
---|---|---|
1 | #!/usr/bin/env python3 | |
2 | ||
3 | # Allow direct execution | |
4 | import os | |
5 | import sys | |
6 | ||
7 | import pytest | |
8 | ||
9 | sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
10 | ||
11 | import io | |
12 | import random | |
13 | import ssl | |
14 | ||
15 | from yt_dlp.cookies import YoutubeDLCookieJar | |
16 | from yt_dlp.dependencies import certifi | |
17 | from yt_dlp.networking import Response | |
18 | from yt_dlp.networking._helper import ( | |
19 | InstanceStoreMixin, | |
20 | add_accept_encoding_header, | |
21 | get_redirect_method, | |
22 | make_socks_proxy_opts, | |
23 | select_proxy, | |
24 | ssl_load_certs, | |
25 | ) | |
26 | from yt_dlp.networking.exceptions import ( | |
27 | HTTPError, | |
28 | IncompleteRead, | |
29 | ) | |
30 | from yt_dlp.socks import ProxyType | |
31 | from yt_dlp.utils.networking import HTTPHeaderDict | |
32 | ||
33 | TEST_DIR = os.path.dirname(os.path.abspath(__file__)) | |
34 | ||
35 | ||
36 | class TestNetworkingUtils: | |
37 | ||
38 | def test_select_proxy(self): | |
39 | proxies = { | |
40 | 'all': 'socks5://example.com', | |
41 | 'http': 'http://example.com:1080', | |
42 | 'no': 'bypass.example.com,yt-dl.org', | |
43 | } | |
44 | ||
45 | assert select_proxy('https://example.com', proxies) == proxies['all'] | |
46 | assert select_proxy('http://example.com', proxies) == proxies['http'] | |
47 | assert select_proxy('http://bypass.example.com', proxies) is None | |
48 | assert select_proxy('https://yt-dl.org', proxies) is None | |
49 | ||
50 | @pytest.mark.parametrize('socks_proxy,expected', [ | |
51 | ('socks5h://example.com', { | |
52 | 'proxytype': ProxyType.SOCKS5, | |
53 | 'addr': 'example.com', | |
54 | 'port': 1080, | |
55 | 'rdns': True, | |
56 | 'username': None, | |
57 | 'password': None, | |
58 | }), | |
59 | ('socks5://user:@example.com:5555', { | |
60 | 'proxytype': ProxyType.SOCKS5, | |
61 | 'addr': 'example.com', | |
62 | 'port': 5555, | |
63 | 'rdns': False, | |
64 | 'username': 'user', | |
65 | 'password': '', | |
66 | }), | |
67 | ('socks4://u%40ser:pa%20ss@127.0.0.1:1080', { | |
68 | 'proxytype': ProxyType.SOCKS4, | |
69 | 'addr': '127.0.0.1', | |
70 | 'port': 1080, | |
71 | 'rdns': False, | |
72 | 'username': 'u@ser', | |
73 | 'password': 'pa ss', | |
74 | }), | |
75 | ('socks4a://:pa%20ss@127.0.0.1', { | |
76 | 'proxytype': ProxyType.SOCKS4A, | |
77 | 'addr': '127.0.0.1', | |
78 | 'port': 1080, | |
79 | 'rdns': True, | |
80 | 'username': '', | |
81 | 'password': 'pa ss', | |
82 | }), | |
83 | ]) | |
84 | def test_make_socks_proxy_opts(self, socks_proxy, expected): | |
85 | assert make_socks_proxy_opts(socks_proxy) == expected | |
86 | ||
87 | def test_make_socks_proxy_unknown(self): | |
88 | with pytest.raises(ValueError, match='Unknown SOCKS proxy version: socks'): | |
89 | make_socks_proxy_opts('socks://127.0.0.1') | |
90 | ||
91 | @pytest.mark.skipif(not certifi, reason='certifi is not installed') | |
92 | def test_load_certifi(self): | |
93 | context_certifi = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) | |
94 | context_certifi.load_verify_locations(cafile=certifi.where()) | |
95 | context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) | |
96 | ssl_load_certs(context, use_certifi=True) | |
97 | assert context.get_ca_certs() == context_certifi.get_ca_certs() | |
98 | ||
99 | context_default = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) | |
100 | context_default.load_default_certs() | |
101 | context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) | |
102 | ssl_load_certs(context, use_certifi=False) | |
103 | assert context.get_ca_certs() == context_default.get_ca_certs() | |
104 | ||
105 | if context_default.get_ca_certs() == context_certifi.get_ca_certs(): | |
106 | pytest.skip('System uses certifi as default. The test is not valid') | |
107 | ||
108 | @pytest.mark.parametrize('method,status,expected', [ | |
109 | ('GET', 303, 'GET'), | |
110 | ('HEAD', 303, 'HEAD'), | |
111 | ('PUT', 303, 'GET'), | |
112 | ('POST', 301, 'GET'), | |
113 | ('HEAD', 301, 'HEAD'), | |
114 | ('POST', 302, 'GET'), | |
115 | ('HEAD', 302, 'HEAD'), | |
116 | ('PUT', 302, 'PUT'), | |
117 | ('POST', 308, 'POST'), | |
118 | ('POST', 307, 'POST'), | |
119 | ('HEAD', 308, 'HEAD'), | |
120 | ('HEAD', 307, 'HEAD'), | |
121 | ]) | |
122 | def test_get_redirect_method(self, method, status, expected): | |
123 | assert get_redirect_method(method, status) == expected | |
124 | ||
125 | @pytest.mark.parametrize('headers,supported_encodings,expected', [ | |
126 | ({'Accept-Encoding': 'br'}, ['gzip', 'br'], {'Accept-Encoding': 'br'}), | |
127 | ({}, ['gzip', 'br'], {'Accept-Encoding': 'gzip, br'}), | |
128 | ({'Content-type': 'application/json'}, [], {'Content-type': 'application/json', 'Accept-Encoding': 'identity'}), | |
129 | ]) | |
130 | def test_add_accept_encoding_header(self, headers, supported_encodings, expected): | |
131 | headers = HTTPHeaderDict(headers) | |
132 | add_accept_encoding_header(headers, supported_encodings) | |
133 | assert headers == HTTPHeaderDict(expected) | |
134 | ||
135 | ||
136 | class TestInstanceStoreMixin: | |
137 | ||
138 | class FakeInstanceStoreMixin(InstanceStoreMixin): | |
139 | def _create_instance(self, **kwargs): | |
140 | return random.randint(0, 1000000) | |
141 | ||
142 | def _close_instance(self, instance): | |
143 | pass | |
144 | ||
145 | def test_mixin(self): | |
146 | mixin = self.FakeInstanceStoreMixin() | |
147 | assert mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'d', 4}}) == mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'d', 4}}) | |
148 | ||
149 | assert mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'e', 4}}) != mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'d', 4}}) | |
150 | ||
151 | assert mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'d', 4}} != mixin._get_instance(d={'a': 1, 'b': 2, 'g': {'d', 4}})) | |
152 | ||
153 | assert mixin._get_instance(d={'a': 1}, e=[1, 2, 3]) == mixin._get_instance(d={'a': 1}, e=[1, 2, 3]) | |
154 | ||
155 | assert mixin._get_instance(d={'a': 1}, e=[1, 2, 3]) != mixin._get_instance(d={'a': 1}, e=[1, 2, 3, 4]) | |
156 | ||
157 | cookiejar = YoutubeDLCookieJar() | |
158 | assert mixin._get_instance(b=[1, 2], c=cookiejar) == mixin._get_instance(b=[1, 2], c=cookiejar) | |
159 | ||
160 | assert mixin._get_instance(b=[1, 2], c=cookiejar) != mixin._get_instance(b=[1, 2], c=YoutubeDLCookieJar()) | |
161 | ||
162 | # Different order | |
163 | assert mixin._get_instance(c=cookiejar, b=[1, 2]) == mixin._get_instance(b=[1, 2], c=cookiejar) | |
164 | ||
165 | m = mixin._get_instance(t=1234) | |
166 | assert mixin._get_instance(t=1234) == m | |
167 | mixin._clear_instances() | |
168 | assert mixin._get_instance(t=1234) != m | |
169 | ||
170 | ||
171 | class TestNetworkingExceptions: | |
172 | ||
173 | @staticmethod | |
174 | def create_response(status): | |
175 | return Response(fp=io.BytesIO(b'test'), url='http://example.com', headers={'tesT': 'test'}, status=status) | |
176 | ||
177 | def test_http_error(self): | |
178 | ||
179 | response = self.create_response(403) | |
180 | error = HTTPError(response) | |
181 | ||
182 | assert error.status == 403 | |
183 | assert str(error) == error.msg == 'HTTP Error 403: Forbidden' | |
184 | assert error.reason == response.reason | |
185 | assert error.response is response | |
186 | ||
187 | data = error.response.read() | |
188 | assert data == b'test' | |
189 | assert repr(error) == '<HTTPError 403: Forbidden>' | |
190 | ||
191 | def test_redirect_http_error(self): | |
192 | response = self.create_response(301) | |
193 | error = HTTPError(response, redirect_loop=True) | |
194 | assert str(error) == error.msg == 'HTTP Error 301: Moved Permanently (redirect loop detected)' | |
195 | assert error.reason == 'Moved Permanently' | |
196 | ||
197 | def test_incomplete_read_error(self): | |
198 | error = IncompleteRead(4, 3, cause='test') | |
199 | assert isinstance(error, IncompleteRead) | |
200 | assert repr(error) == '<IncompleteRead: 4 bytes read, 3 more expected>' | |
201 | assert str(error) == error.msg == '4 bytes read, 3 more expected' | |
202 | assert error.partial == 4 | |
203 | assert error.expected == 3 | |
204 | assert error.cause == 'test' | |
205 | ||
206 | error = IncompleteRead(3) | |
207 | assert repr(error) == '<IncompleteRead: 3 bytes read>' | |
208 | assert str(error) == '3 bytes read' |