]> jfr.im git - dlqueue.git/blob - venv/lib/python3.11/site-packages/werkzeug/formparser.py
init: venv aand flask
[dlqueue.git] / venv / lib / python3.11 / site-packages / werkzeug / formparser.py
1 from __future__ import annotations
2
3 import typing as t
4 from io import BytesIO
5 from urllib.parse import parse_qsl
6
7 from ._internal import _plain_int
8 from .datastructures import FileStorage
9 from .datastructures import Headers
10 from .datastructures import MultiDict
11 from .exceptions import RequestEntityTooLarge
12 from .http import parse_options_header
13 from .sansio.multipart import Data
14 from .sansio.multipart import Epilogue
15 from .sansio.multipart import Field
16 from .sansio.multipart import File
17 from .sansio.multipart import MultipartDecoder
18 from .sansio.multipart import NeedData
19 from .wsgi import get_content_length
20 from .wsgi import get_input_stream
21
22 # there are some platforms where SpooledTemporaryFile is not available.
23 # In that case we need to provide a fallback.
24 try:
25 from tempfile import SpooledTemporaryFile
26 except ImportError:
27 from tempfile import TemporaryFile
28
29 SpooledTemporaryFile = None # type: ignore
30
31 if t.TYPE_CHECKING:
32 import typing as te
33 from _typeshed.wsgi import WSGIEnvironment
34
35 t_parse_result = t.Tuple[t.IO[bytes], MultiDict, MultiDict]
36
37 class TStreamFactory(te.Protocol):
38 def __call__(
39 self,
40 total_content_length: int | None,
41 content_type: str | None,
42 filename: str | None,
43 content_length: int | None = None,
44 ) -> t.IO[bytes]:
45 ...
46
47
48 F = t.TypeVar("F", bound=t.Callable[..., t.Any])
49
50
51 def default_stream_factory(
52 total_content_length: int | None,
53 content_type: str | None,
54 filename: str | None,
55 content_length: int | None = None,
56 ) -> t.IO[bytes]:
57 max_size = 1024 * 500
58
59 if SpooledTemporaryFile is not None:
60 return t.cast(t.IO[bytes], SpooledTemporaryFile(max_size=max_size, mode="rb+"))
61 elif total_content_length is None or total_content_length > max_size:
62 return t.cast(t.IO[bytes], TemporaryFile("rb+"))
63
64 return BytesIO()
65
66
67 def parse_form_data(
68 environ: WSGIEnvironment,
69 stream_factory: TStreamFactory | None = None,
70 max_form_memory_size: int | None = None,
71 max_content_length: int | None = None,
72 cls: type[MultiDict] | None = None,
73 silent: bool = True,
74 *,
75 max_form_parts: int | None = None,
76 ) -> t_parse_result:
77 """Parse the form data in the environ and return it as tuple in the form
78 ``(stream, form, files)``. You should only call this method if the
79 transport method is `POST`, `PUT`, or `PATCH`.
80
81 If the mimetype of the data transmitted is `multipart/form-data` the
82 files multidict will be filled with `FileStorage` objects. If the
83 mimetype is unknown the input stream is wrapped and returned as first
84 argument, else the stream is empty.
85
86 This is a shortcut for the common usage of :class:`FormDataParser`.
87
88 :param environ: the WSGI environment to be used for parsing.
89 :param stream_factory: An optional callable that returns a new read and
90 writeable file descriptor. This callable works
91 the same as :meth:`Response._get_file_stream`.
92 :param max_form_memory_size: the maximum number of bytes to be accepted for
93 in-memory stored form data. If the data
94 exceeds the value specified an
95 :exc:`~exceptions.RequestEntityTooLarge`
96 exception is raised.
97 :param max_content_length: If this is provided and the transmitted data
98 is longer than this value an
99 :exc:`~exceptions.RequestEntityTooLarge`
100 exception is raised.
101 :param cls: an optional dict class to use. If this is not specified
102 or `None` the default :class:`MultiDict` is used.
103 :param silent: If set to False parsing errors will not be caught.
104 :param max_form_parts: The maximum number of multipart parts to be parsed. If this
105 is exceeded, a :exc:`~exceptions.RequestEntityTooLarge` exception is raised.
106 :return: A tuple in the form ``(stream, form, files)``.
107
108 .. versionchanged:: 3.0
109 The ``charset`` and ``errors`` parameters were removed.
110
111 .. versionchanged:: 2.3
112 Added the ``max_form_parts`` parameter.
113
114 .. versionadded:: 0.5.1
115 Added the ``silent`` parameter.
116
117 .. versionadded:: 0.5
118 Added the ``max_form_memory_size``, ``max_content_length``, and ``cls``
119 parameters.
120 """
121 return FormDataParser(
122 stream_factory=stream_factory,
123 max_form_memory_size=max_form_memory_size,
124 max_content_length=max_content_length,
125 max_form_parts=max_form_parts,
126 silent=silent,
127 cls=cls,
128 ).parse_from_environ(environ)
129
130
131 class FormDataParser:
132 """This class implements parsing of form data for Werkzeug. By itself
133 it can parse multipart and url encoded form data. It can be subclassed
134 and extended but for most mimetypes it is a better idea to use the
135 untouched stream and expose it as separate attributes on a request
136 object.
137
138 :param stream_factory: An optional callable that returns a new read and
139 writeable file descriptor. This callable works
140 the same as :meth:`Response._get_file_stream`.
141 :param max_form_memory_size: the maximum number of bytes to be accepted for
142 in-memory stored form data. If the data
143 exceeds the value specified an
144 :exc:`~exceptions.RequestEntityTooLarge`
145 exception is raised.
146 :param max_content_length: If this is provided and the transmitted data
147 is longer than this value an
148 :exc:`~exceptions.RequestEntityTooLarge`
149 exception is raised.
150 :param cls: an optional dict class to use. If this is not specified
151 or `None` the default :class:`MultiDict` is used.
152 :param silent: If set to False parsing errors will not be caught.
153 :param max_form_parts: The maximum number of multipart parts to be parsed. If this
154 is exceeded, a :exc:`~exceptions.RequestEntityTooLarge` exception is raised.
155
156 .. versionchanged:: 3.0
157 The ``charset`` and ``errors`` parameters were removed.
158
159 .. versionchanged:: 3.0
160 The ``parse_functions`` attribute and ``get_parse_func`` methods were removed.
161
162 .. versionchanged:: 2.2.3
163 Added the ``max_form_parts`` parameter.
164
165 .. versionadded:: 0.8
166 """
167
168 def __init__(
169 self,
170 stream_factory: TStreamFactory | None = None,
171 max_form_memory_size: int | None = None,
172 max_content_length: int | None = None,
173 cls: type[MultiDict] | None = None,
174 silent: bool = True,
175 *,
176 max_form_parts: int | None = None,
177 ) -> None:
178 if stream_factory is None:
179 stream_factory = default_stream_factory
180
181 self.stream_factory = stream_factory
182 self.max_form_memory_size = max_form_memory_size
183 self.max_content_length = max_content_length
184 self.max_form_parts = max_form_parts
185
186 if cls is None:
187 cls = MultiDict
188
189 self.cls = cls
190 self.silent = silent
191
192 def parse_from_environ(self, environ: WSGIEnvironment) -> t_parse_result:
193 """Parses the information from the environment as form data.
194
195 :param environ: the WSGI environment to be used for parsing.
196 :return: A tuple in the form ``(stream, form, files)``.
197 """
198 stream = get_input_stream(environ, max_content_length=self.max_content_length)
199 content_length = get_content_length(environ)
200 mimetype, options = parse_options_header(environ.get("CONTENT_TYPE"))
201 return self.parse(
202 stream,
203 content_length=content_length,
204 mimetype=mimetype,
205 options=options,
206 )
207
208 def parse(
209 self,
210 stream: t.IO[bytes],
211 mimetype: str,
212 content_length: int | None,
213 options: dict[str, str] | None = None,
214 ) -> t_parse_result:
215 """Parses the information from the given stream, mimetype,
216 content length and mimetype parameters.
217
218 :param stream: an input stream
219 :param mimetype: the mimetype of the data
220 :param content_length: the content length of the incoming data
221 :param options: optional mimetype parameters (used for
222 the multipart boundary for instance)
223 :return: A tuple in the form ``(stream, form, files)``.
224
225 .. versionchanged:: 3.0
226 The invalid ``application/x-url-encoded`` content type is not
227 treated as ``application/x-www-form-urlencoded``.
228 """
229 if mimetype == "multipart/form-data":
230 parse_func = self._parse_multipart
231 elif mimetype == "application/x-www-form-urlencoded":
232 parse_func = self._parse_urlencoded
233 else:
234 return stream, self.cls(), self.cls()
235
236 if options is None:
237 options = {}
238
239 try:
240 return parse_func(stream, mimetype, content_length, options)
241 except ValueError:
242 if not self.silent:
243 raise
244
245 return stream, self.cls(), self.cls()
246
247 def _parse_multipart(
248 self,
249 stream: t.IO[bytes],
250 mimetype: str,
251 content_length: int | None,
252 options: dict[str, str],
253 ) -> t_parse_result:
254 parser = MultiPartParser(
255 stream_factory=self.stream_factory,
256 max_form_memory_size=self.max_form_memory_size,
257 max_form_parts=self.max_form_parts,
258 cls=self.cls,
259 )
260 boundary = options.get("boundary", "").encode("ascii")
261
262 if not boundary:
263 raise ValueError("Missing boundary")
264
265 form, files = parser.parse(stream, boundary, content_length)
266 return stream, form, files
267
268 def _parse_urlencoded(
269 self,
270 stream: t.IO[bytes],
271 mimetype: str,
272 content_length: int | None,
273 options: dict[str, str],
274 ) -> t_parse_result:
275 if (
276 self.max_form_memory_size is not None
277 and content_length is not None
278 and content_length > self.max_form_memory_size
279 ):
280 raise RequestEntityTooLarge()
281
282 try:
283 items = parse_qsl(
284 stream.read().decode(),
285 keep_blank_values=True,
286 errors="werkzeug.url_quote",
287 )
288 except ValueError as e:
289 raise RequestEntityTooLarge() from e
290
291 return stream, self.cls(items), self.cls()
292
293
294 class MultiPartParser:
295 def __init__(
296 self,
297 stream_factory: TStreamFactory | None = None,
298 max_form_memory_size: int | None = None,
299 cls: type[MultiDict] | None = None,
300 buffer_size: int = 64 * 1024,
301 max_form_parts: int | None = None,
302 ) -> None:
303 self.max_form_memory_size = max_form_memory_size
304 self.max_form_parts = max_form_parts
305
306 if stream_factory is None:
307 stream_factory = default_stream_factory
308
309 self.stream_factory = stream_factory
310
311 if cls is None:
312 cls = MultiDict
313
314 self.cls = cls
315 self.buffer_size = buffer_size
316
317 def fail(self, message: str) -> te.NoReturn:
318 raise ValueError(message)
319
320 def get_part_charset(self, headers: Headers) -> str:
321 # Figure out input charset for current part
322 content_type = headers.get("content-type")
323
324 if content_type:
325 parameters = parse_options_header(content_type)[1]
326 ct_charset = parameters.get("charset", "").lower()
327
328 # A safe list of encodings. Modern clients should only send ASCII or UTF-8.
329 # This list will not be extended further.
330 if ct_charset in {"ascii", "us-ascii", "utf-8", "iso-8859-1"}:
331 return ct_charset
332
333 return "utf-8"
334
335 def start_file_streaming(
336 self, event: File, total_content_length: int | None
337 ) -> t.IO[bytes]:
338 content_type = event.headers.get("content-type")
339
340 try:
341 content_length = _plain_int(event.headers["content-length"])
342 except (KeyError, ValueError):
343 content_length = 0
344
345 container = self.stream_factory(
346 total_content_length=total_content_length,
347 filename=event.filename,
348 content_type=content_type,
349 content_length=content_length,
350 )
351 return container
352
353 def parse(
354 self, stream: t.IO[bytes], boundary: bytes, content_length: int | None
355 ) -> tuple[MultiDict, MultiDict]:
356 current_part: Field | File
357 container: t.IO[bytes] | list[bytes]
358 _write: t.Callable[[bytes], t.Any]
359
360 parser = MultipartDecoder(
361 boundary,
362 max_form_memory_size=self.max_form_memory_size,
363 max_parts=self.max_form_parts,
364 )
365
366 fields = []
367 files = []
368
369 for data in _chunk_iter(stream.read, self.buffer_size):
370 parser.receive_data(data)
371 event = parser.next_event()
372 while not isinstance(event, (Epilogue, NeedData)):
373 if isinstance(event, Field):
374 current_part = event
375 container = []
376 _write = container.append
377 elif isinstance(event, File):
378 current_part = event
379 container = self.start_file_streaming(event, content_length)
380 _write = container.write
381 elif isinstance(event, Data):
382 _write(event.data)
383 if not event.more_data:
384 if isinstance(current_part, Field):
385 value = b"".join(container).decode(
386 self.get_part_charset(current_part.headers), "replace"
387 )
388 fields.append((current_part.name, value))
389 else:
390 container = t.cast(t.IO[bytes], container)
391 container.seek(0)
392 files.append(
393 (
394 current_part.name,
395 FileStorage(
396 container,
397 current_part.filename,
398 current_part.name,
399 headers=current_part.headers,
400 ),
401 )
402 )
403
404 event = parser.next_event()
405
406 return self.cls(fields), self.cls(files)
407
408
409 def _chunk_iter(read: t.Callable[[int], bytes], size: int) -> t.Iterator[bytes | None]:
410 """Read data in chunks for multipart/form-data parsing. Stop if no data is read.
411 Yield ``None`` at the end to signal end of parsing.
412 """
413 while True:
414 data = read(size)
415
416 if not data:
417 break
418
419 yield data
420
421 yield None