]> jfr.im git - dlqueue.git/blob - venv/lib/python3.11/site-packages/werkzeug/middleware/lint.py
init: venv aand flask
[dlqueue.git] / venv / lib / python3.11 / site-packages / werkzeug / middleware / lint.py
1 """
2 WSGI Protocol Linter
3 ====================
4
5 This module provides a middleware that performs sanity checks on the
6 behavior of the WSGI server and application. It checks that the
7 :pep:`3333` WSGI spec is properly implemented. It also warns on some
8 common HTTP errors such as non-empty responses for 304 status codes.
9
10 .. autoclass:: LintMiddleware
11
12 :copyright: 2007 Pallets
13 :license: BSD-3-Clause
14 """
15 from __future__ import annotations
16
17 import typing as t
18 from types import TracebackType
19 from urllib.parse import urlparse
20 from warnings import warn
21
22 from ..datastructures import Headers
23 from ..http import is_entity_header
24 from ..wsgi import FileWrapper
25
26 if t.TYPE_CHECKING:
27 from _typeshed.wsgi import StartResponse
28 from _typeshed.wsgi import WSGIApplication
29 from _typeshed.wsgi import WSGIEnvironment
30
31
32 class WSGIWarning(Warning):
33 """Warning class for WSGI warnings."""
34
35
36 class HTTPWarning(Warning):
37 """Warning class for HTTP warnings."""
38
39
40 def check_type(context: str, obj: object, need: t.Type = str) -> None:
41 if type(obj) is not need:
42 warn(
43 f"{context!r} requires {need.__name__!r}, got {type(obj).__name__!r}.",
44 WSGIWarning,
45 stacklevel=3,
46 )
47
48
49 class InputStream:
50 def __init__(self, stream: t.IO[bytes]) -> None:
51 self._stream = stream
52
53 def read(self, *args: t.Any) -> bytes:
54 if len(args) == 0:
55 warn(
56 "WSGI does not guarantee an EOF marker on the input stream, thus making"
57 " calls to 'wsgi.input.read()' unsafe. Conforming servers may never"
58 " return from this call.",
59 WSGIWarning,
60 stacklevel=2,
61 )
62 elif len(args) != 1:
63 warn(
64 "Too many parameters passed to 'wsgi.input.read()'.",
65 WSGIWarning,
66 stacklevel=2,
67 )
68 return self._stream.read(*args)
69
70 def readline(self, *args: t.Any) -> bytes:
71 if len(args) == 0:
72 warn(
73 "Calls to 'wsgi.input.readline()' without arguments are unsafe. Use"
74 " 'wsgi.input.read()' instead.",
75 WSGIWarning,
76 stacklevel=2,
77 )
78 elif len(args) == 1:
79 warn(
80 "'wsgi.input.readline()' was called with a size hint. WSGI does not"
81 " support this, although it's available on all major servers.",
82 WSGIWarning,
83 stacklevel=2,
84 )
85 else:
86 raise TypeError("Too many arguments passed to 'wsgi.input.readline()'.")
87 return self._stream.readline(*args)
88
89 def __iter__(self) -> t.Iterator[bytes]:
90 try:
91 return iter(self._stream)
92 except TypeError:
93 warn("'wsgi.input' is not iterable.", WSGIWarning, stacklevel=2)
94 return iter(())
95
96 def close(self) -> None:
97 warn("The application closed the input stream!", WSGIWarning, stacklevel=2)
98 self._stream.close()
99
100
101 class ErrorStream:
102 def __init__(self, stream: t.IO[str]) -> None:
103 self._stream = stream
104
105 def write(self, s: str) -> None:
106 check_type("wsgi.error.write()", s, str)
107 self._stream.write(s)
108
109 def flush(self) -> None:
110 self._stream.flush()
111
112 def writelines(self, seq: t.Iterable[str]) -> None:
113 for line in seq:
114 self.write(line)
115
116 def close(self) -> None:
117 warn("The application closed the error stream!", WSGIWarning, stacklevel=2)
118 self._stream.close()
119
120
121 class GuardedWrite:
122 def __init__(self, write: t.Callable[[bytes], object], chunks: list[int]) -> None:
123 self._write = write
124 self._chunks = chunks
125
126 def __call__(self, s: bytes) -> None:
127 check_type("write()", s, bytes)
128 self._write(s)
129 self._chunks.append(len(s))
130
131
132 class GuardedIterator:
133 def __init__(
134 self,
135 iterator: t.Iterable[bytes],
136 headers_set: tuple[int, Headers],
137 chunks: list[int],
138 ) -> None:
139 self._iterator = iterator
140 self._next = iter(iterator).__next__
141 self.closed = False
142 self.headers_set = headers_set
143 self.chunks = chunks
144
145 def __iter__(self) -> GuardedIterator:
146 return self
147
148 def __next__(self) -> bytes:
149 if self.closed:
150 warn("Iterated over closed 'app_iter'.", WSGIWarning, stacklevel=2)
151
152 rv = self._next()
153
154 if not self.headers_set:
155 warn(
156 "The application returned before it started the response.",
157 WSGIWarning,
158 stacklevel=2,
159 )
160
161 check_type("application iterator items", rv, bytes)
162 self.chunks.append(len(rv))
163 return rv
164
165 def close(self) -> None:
166 self.closed = True
167
168 if hasattr(self._iterator, "close"):
169 self._iterator.close()
170
171 if self.headers_set:
172 status_code, headers = self.headers_set
173 bytes_sent = sum(self.chunks)
174 content_length = headers.get("content-length", type=int)
175
176 if status_code == 304:
177 for key, _value in headers:
178 key = key.lower()
179 if key not in ("expires", "content-location") and is_entity_header(
180 key
181 ):
182 warn(
183 f"Entity header {key!r} found in 304 response.", HTTPWarning
184 )
185 if bytes_sent:
186 warn("304 responses must not have a body.", HTTPWarning)
187 elif 100 <= status_code < 200 or status_code == 204:
188 if content_length != 0:
189 warn(
190 f"{status_code} responses must have an empty content length.",
191 HTTPWarning,
192 )
193 if bytes_sent:
194 warn(f"{status_code} responses must not have a body.", HTTPWarning)
195 elif content_length is not None and content_length != bytes_sent:
196 warn(
197 "Content-Length and the number of bytes sent to the"
198 " client do not match.",
199 WSGIWarning,
200 )
201
202 def __del__(self) -> None:
203 if not self.closed:
204 try:
205 warn(
206 "Iterator was garbage collected before it was closed.", WSGIWarning
207 )
208 except Exception:
209 pass
210
211
212 class LintMiddleware:
213 """Warns about common errors in the WSGI and HTTP behavior of the
214 server and wrapped application. Some of the issues it checks are:
215
216 - invalid status codes
217 - non-bytes sent to the WSGI server
218 - strings returned from the WSGI application
219 - non-empty conditional responses
220 - unquoted etags
221 - relative URLs in the Location header
222 - unsafe calls to wsgi.input
223 - unclosed iterators
224
225 Error information is emitted using the :mod:`warnings` module.
226
227 :param app: The WSGI application to wrap.
228
229 .. code-block:: python
230
231 from werkzeug.middleware.lint import LintMiddleware
232 app = LintMiddleware(app)
233 """
234
235 def __init__(self, app: WSGIApplication) -> None:
236 self.app = app
237
238 def check_environ(self, environ: WSGIEnvironment) -> None:
239 if type(environ) is not dict:
240 warn(
241 "WSGI environment is not a standard Python dict.",
242 WSGIWarning,
243 stacklevel=4,
244 )
245 for key in (
246 "REQUEST_METHOD",
247 "SERVER_NAME",
248 "SERVER_PORT",
249 "wsgi.version",
250 "wsgi.input",
251 "wsgi.errors",
252 "wsgi.multithread",
253 "wsgi.multiprocess",
254 "wsgi.run_once",
255 ):
256 if key not in environ:
257 warn(
258 f"Required environment key {key!r} not found",
259 WSGIWarning,
260 stacklevel=3,
261 )
262 if environ["wsgi.version"] != (1, 0):
263 warn("Environ is not a WSGI 1.0 environ.", WSGIWarning, stacklevel=3)
264
265 script_name = environ.get("SCRIPT_NAME", "")
266 path_info = environ.get("PATH_INFO", "")
267
268 if script_name and script_name[0] != "/":
269 warn(
270 f"'SCRIPT_NAME' does not start with a slash: {script_name!r}",
271 WSGIWarning,
272 stacklevel=3,
273 )
274
275 if path_info and path_info[0] != "/":
276 warn(
277 f"'PATH_INFO' does not start with a slash: {path_info!r}",
278 WSGIWarning,
279 stacklevel=3,
280 )
281
282 def check_start_response(
283 self,
284 status: str,
285 headers: list[tuple[str, str]],
286 exc_info: None | (tuple[type[BaseException], BaseException, TracebackType]),
287 ) -> tuple[int, Headers]:
288 check_type("status", status, str)
289 status_code_str = status.split(None, 1)[0]
290
291 if len(status_code_str) != 3 or not status_code_str.isdecimal():
292 warn("Status code must be three digits.", WSGIWarning, stacklevel=3)
293
294 if len(status) < 4 or status[3] != " ":
295 warn(
296 f"Invalid value for status {status!r}. Valid status strings are three"
297 " digits, a space and a status explanation.",
298 WSGIWarning,
299 stacklevel=3,
300 )
301
302 status_code = int(status_code_str)
303
304 if status_code < 100:
305 warn("Status code < 100 detected.", WSGIWarning, stacklevel=3)
306
307 if type(headers) is not list:
308 warn("Header list is not a list.", WSGIWarning, stacklevel=3)
309
310 for item in headers:
311 if type(item) is not tuple or len(item) != 2:
312 warn("Header items must be 2-item tuples.", WSGIWarning, stacklevel=3)
313 name, value = item
314 if type(name) is not str or type(value) is not str:
315 warn(
316 "Header keys and values must be strings.", WSGIWarning, stacklevel=3
317 )
318 if name.lower() == "status":
319 warn(
320 "The status header is not supported due to"
321 " conflicts with the CGI spec.",
322 WSGIWarning,
323 stacklevel=3,
324 )
325
326 if exc_info is not None and not isinstance(exc_info, tuple):
327 warn("Invalid value for exc_info.", WSGIWarning, stacklevel=3)
328
329 headers = Headers(headers)
330 self.check_headers(headers)
331
332 return status_code, headers
333
334 def check_headers(self, headers: Headers) -> None:
335 etag = headers.get("etag")
336
337 if etag is not None:
338 if etag.startswith(("W/", "w/")):
339 if etag.startswith("w/"):
340 warn(
341 "Weak etag indicator should be upper case.",
342 HTTPWarning,
343 stacklevel=4,
344 )
345
346 etag = etag[2:]
347
348 if not (etag[:1] == etag[-1:] == '"'):
349 warn("Unquoted etag emitted.", HTTPWarning, stacklevel=4)
350
351 location = headers.get("location")
352
353 if location is not None:
354 if not urlparse(location).netloc:
355 warn(
356 "Absolute URLs required for location header.",
357 HTTPWarning,
358 stacklevel=4,
359 )
360
361 def check_iterator(self, app_iter: t.Iterable[bytes]) -> None:
362 if isinstance(app_iter, str):
363 warn(
364 "The application returned a string. The response will send one"
365 " character at a time to the client, which will kill performance."
366 " Return a list or iterable instead.",
367 WSGIWarning,
368 stacklevel=3,
369 )
370
371 def __call__(self, *args: t.Any, **kwargs: t.Any) -> t.Iterable[bytes]:
372 if len(args) != 2:
373 warn("A WSGI app takes two arguments.", WSGIWarning, stacklevel=2)
374
375 if kwargs:
376 warn(
377 "A WSGI app does not take keyword arguments.", WSGIWarning, stacklevel=2
378 )
379
380 environ: WSGIEnvironment = args[0]
381 start_response: StartResponse = args[1]
382
383 self.check_environ(environ)
384 environ["wsgi.input"] = InputStream(environ["wsgi.input"])
385 environ["wsgi.errors"] = ErrorStream(environ["wsgi.errors"])
386
387 # Hook our own file wrapper in so that applications will always
388 # iterate to the end and we can check the content length.
389 environ["wsgi.file_wrapper"] = FileWrapper
390
391 headers_set: list[t.Any] = []
392 chunks: list[int] = []
393
394 def checking_start_response(
395 *args: t.Any, **kwargs: t.Any
396 ) -> t.Callable[[bytes], None]:
397 if len(args) not in {2, 3}:
398 warn(
399 f"Invalid number of arguments: {len(args)}, expected 2 or 3.",
400 WSGIWarning,
401 stacklevel=2,
402 )
403
404 if kwargs:
405 warn("'start_response' does not take keyword arguments.", WSGIWarning)
406
407 status: str = args[0]
408 headers: list[tuple[str, str]] = args[1]
409 exc_info: None | (
410 tuple[type[BaseException], BaseException, TracebackType]
411 ) = (args[2] if len(args) == 3 else None)
412
413 headers_set[:] = self.check_start_response(status, headers, exc_info)
414 return GuardedWrite(start_response(status, headers, exc_info), chunks)
415
416 app_iter = self.app(environ, t.cast("StartResponse", checking_start_response))
417 self.check_iterator(app_iter)
418 return GuardedIterator(
419 app_iter, t.cast(t.Tuple[int, Headers], headers_set), chunks
420 )