1 from __future__
import annotations
5 from urllib
.parse
import parse_qsl
7 from ._internal
import _plain_int
8 from .datastructures
import FileStorage
9 from .datastructures
import Headers
10 from .datastructures
import MultiDict
11 from .exceptions
import RequestEntityTooLarge
12 from .http
import parse_options_header
13 from .sansio
.multipart
import Data
14 from .sansio
.multipart
import Epilogue
15 from .sansio
.multipart
import Field
16 from .sansio
.multipart
import File
17 from .sansio
.multipart
import MultipartDecoder
18 from .sansio
.multipart
import NeedData
19 from .wsgi
import get_content_length
20 from .wsgi
import get_input_stream
22 # there are some platforms where SpooledTemporaryFile is not available.
23 # In that case we need to provide a fallback.
25 from tempfile
import SpooledTemporaryFile
27 from tempfile
import TemporaryFile
29 SpooledTemporaryFile
= None # type: ignore
33 from _typeshed
.wsgi
import WSGIEnvironment
35 t_parse_result
= t
.Tuple
[t
.IO
[bytes], MultiDict
, MultiDict
]
37 class TStreamFactory(te
.Protocol
):
40 total_content_length
: int |
None,
41 content_type
: str |
None,
43 content_length
: int |
None = None,
48 F
= t
.TypeVar("F", bound
=t
.Callable
[..., t
.Any
])
51 def default_stream_factory(
52 total_content_length
: int |
None,
53 content_type
: str |
None,
55 content_length
: int |
None = None,
59 if SpooledTemporaryFile
is not None:
60 return t
.cast(t
.IO
[bytes], SpooledTemporaryFile(max_size
=max_size
, mode
="rb+"))
61 elif total_content_length
is None or total_content_length
> max_size
:
62 return t
.cast(t
.IO
[bytes], TemporaryFile("rb+"))
68 environ
: WSGIEnvironment
,
69 stream_factory
: TStreamFactory |
None = None,
70 max_form_memory_size
: int |
None = None,
71 max_content_length
: int |
None = None,
72 cls
: type[MultiDict
] |
None = None,
75 max_form_parts
: int |
None = None,
77 """Parse the form data in the environ and return it as tuple in the form
78 ``(stream, form, files)``. You should only call this method if the
79 transport method is `POST`, `PUT`, or `PATCH`.
81 If the mimetype of the data transmitted is `multipart/form-data` the
82 files multidict will be filled with `FileStorage` objects. If the
83 mimetype is unknown the input stream is wrapped and returned as first
84 argument, else the stream is empty.
86 This is a shortcut for the common usage of :class:`FormDataParser`.
88 :param environ: the WSGI environment to be used for parsing.
89 :param stream_factory: An optional callable that returns a new read and
90 writeable file descriptor. This callable works
91 the same as :meth:`Response._get_file_stream`.
92 :param max_form_memory_size: the maximum number of bytes to be accepted for
93 in-memory stored form data. If the data
94 exceeds the value specified an
95 :exc:`~exceptions.RequestEntityTooLarge`
97 :param max_content_length: If this is provided and the transmitted data
98 is longer than this value an
99 :exc:`~exceptions.RequestEntityTooLarge`
101 :param cls: an optional dict class to use. If this is not specified
102 or `None` the default :class:`MultiDict` is used.
103 :param silent: If set to False parsing errors will not be caught.
104 :param max_form_parts: The maximum number of multipart parts to be parsed. If this
105 is exceeded, a :exc:`~exceptions.RequestEntityTooLarge` exception is raised.
106 :return: A tuple in the form ``(stream, form, files)``.
108 .. versionchanged:: 3.0
109 The ``charset`` and ``errors`` parameters were removed.
111 .. versionchanged:: 2.3
112 Added the ``max_form_parts`` parameter.
114 .. versionadded:: 0.5.1
115 Added the ``silent`` parameter.
117 .. versionadded:: 0.5
118 Added the ``max_form_memory_size``, ``max_content_length``, and ``cls``
121 return FormDataParser(
122 stream_factory
=stream_factory
,
123 max_form_memory_size
=max_form_memory_size
,
124 max_content_length
=max_content_length
,
125 max_form_parts
=max_form_parts
,
128 ).parse_from_environ(environ
)
131 class FormDataParser
:
132 """This class implements parsing of form data for Werkzeug. By itself
133 it can parse multipart and url encoded form data. It can be subclassed
134 and extended but for most mimetypes it is a better idea to use the
135 untouched stream and expose it as separate attributes on a request
138 :param stream_factory: An optional callable that returns a new read and
139 writeable file descriptor. This callable works
140 the same as :meth:`Response._get_file_stream`.
141 :param max_form_memory_size: the maximum number of bytes to be accepted for
142 in-memory stored form data. If the data
143 exceeds the value specified an
144 :exc:`~exceptions.RequestEntityTooLarge`
146 :param max_content_length: If this is provided and the transmitted data
147 is longer than this value an
148 :exc:`~exceptions.RequestEntityTooLarge`
150 :param cls: an optional dict class to use. If this is not specified
151 or `None` the default :class:`MultiDict` is used.
152 :param silent: If set to False parsing errors will not be caught.
153 :param max_form_parts: The maximum number of multipart parts to be parsed. If this
154 is exceeded, a :exc:`~exceptions.RequestEntityTooLarge` exception is raised.
156 .. versionchanged:: 3.0
157 The ``charset`` and ``errors`` parameters were removed.
159 .. versionchanged:: 3.0
160 The ``parse_functions`` attribute and ``get_parse_func`` methods were removed.
162 .. versionchanged:: 2.2.3
163 Added the ``max_form_parts`` parameter.
165 .. versionadded:: 0.8
170 stream_factory
: TStreamFactory |
None = None,
171 max_form_memory_size
: int |
None = None,
172 max_content_length
: int |
None = None,
173 cls
: type[MultiDict
] |
None = None,
176 max_form_parts
: int |
None = None,
178 if stream_factory
is None:
179 stream_factory
= default_stream_factory
181 self
.stream_factory
= stream_factory
182 self
.max_form_memory_size
= max_form_memory_size
183 self
.max_content_length
= max_content_length
184 self
.max_form_parts
= max_form_parts
192 def parse_from_environ(self
, environ
: WSGIEnvironment
) -> t_parse_result
:
193 """Parses the information from the environment as form data.
195 :param environ: the WSGI environment to be used for parsing.
196 :return: A tuple in the form ``(stream, form, files)``.
198 stream
= get_input_stream(environ
, max_content_length
=self
.max_content_length
)
199 content_length
= get_content_length(environ
)
200 mimetype
, options
= parse_options_header(environ
.get("CONTENT_TYPE"))
203 content_length
=content_length
,
212 content_length
: int |
None,
213 options
: dict[str, str] |
None = None,
215 """Parses the information from the given stream, mimetype,
216 content length and mimetype parameters.
218 :param stream: an input stream
219 :param mimetype: the mimetype of the data
220 :param content_length: the content length of the incoming data
221 :param options: optional mimetype parameters (used for
222 the multipart boundary for instance)
223 :return: A tuple in the form ``(stream, form, files)``.
225 .. versionchanged:: 3.0
226 The invalid ``application/x-url-encoded`` content type is not
227 treated as ``application/x-www-form-urlencoded``.
229 if mimetype
== "multipart/form-data":
230 parse_func
= self
._parse
_multipart
231 elif mimetype
== "application/x-www-form-urlencoded":
232 parse_func
= self
._parse
_urlencoded
234 return stream
, self
.cls(), self
.cls()
240 return parse_func(stream
, mimetype
, content_length
, options
)
245 return stream
, self
.cls(), self
.cls()
247 def _parse_multipart(
251 content_length
: int |
None,
252 options
: dict[str, str],
254 parser
= MultiPartParser(
255 stream_factory
=self
.stream_factory
,
256 max_form_memory_size
=self
.max_form_memory_size
,
257 max_form_parts
=self
.max_form_parts
,
260 boundary
= options
.get("boundary", "").encode("ascii")
263 raise ValueError("Missing boundary")
265 form
, files
= parser
.parse(stream
, boundary
, content_length
)
266 return stream
, form
, files
268 def _parse_urlencoded(
272 content_length
: int |
None,
273 options
: dict[str, str],
276 self
.max_form_memory_size
is not None
277 and content_length
is not None
278 and content_length
> self
.max_form_memory_size
280 raise RequestEntityTooLarge()
284 stream
.read().decode(),
285 keep_blank_values
=True,
286 errors
="werkzeug.url_quote",
288 except ValueError as e
:
289 raise RequestEntityTooLarge() from e
291 return stream
, self
.cls(items
), self
.cls()
294 class MultiPartParser
:
297 stream_factory
: TStreamFactory |
None = None,
298 max_form_memory_size
: int |
None = None,
299 cls
: type[MultiDict
] |
None = None,
300 buffer_size
: int = 64 * 1024,
301 max_form_parts
: int |
None = None,
303 self
.max_form_memory_size
= max_form_memory_size
304 self
.max_form_parts
= max_form_parts
306 if stream_factory
is None:
307 stream_factory
= default_stream_factory
309 self
.stream_factory
= stream_factory
315 self
.buffer_size
= buffer_size
317 def fail(self
, message
: str) -> te
.NoReturn
:
318 raise ValueError(message
)
320 def get_part_charset(self
, headers
: Headers
) -> str:
321 # Figure out input charset for current part
322 content_type
= headers
.get("content-type")
325 parameters
= parse_options_header(content_type
)[1]
326 ct_charset
= parameters
.get("charset", "").lower()
328 # A safe list of encodings. Modern clients should only send ASCII or UTF-8.
329 # This list will not be extended further.
330 if ct_charset
in {"ascii", "us-ascii", "utf-8", "iso-8859-1"}
:
335 def start_file_streaming(
336 self
, event
: File
, total_content_length
: int |
None
338 content_type
= event
.headers
.get("content-type")
341 content_length
= _plain_int(event
.headers
["content-length"])
342 except (KeyError, ValueError):
345 container
= self
.stream_factory(
346 total_content_length
=total_content_length
,
347 filename
=event
.filename
,
348 content_type
=content_type
,
349 content_length
=content_length
,
354 self
, stream
: t
.IO
[bytes], boundary
: bytes, content_length
: int |
None
355 ) -> tuple[MultiDict
, MultiDict
]:
356 current_part
: Field | File
357 container
: t
.IO
[bytes] |
list[bytes]
358 _write
: t
.Callable
[[bytes], t
.Any
]
360 parser
= MultipartDecoder(
362 max_form_memory_size
=self
.max_form_memory_size
,
363 max_parts
=self
.max_form_parts
,
369 for data
in _chunk_iter(stream
.read
, self
.buffer_size
):
370 parser
.receive_data(data
)
371 event
= parser
.next_event()
372 while not isinstance(event
, (Epilogue
, NeedData
)):
373 if isinstance(event
, Field
):
376 _write
= container
.append
377 elif isinstance(event
, File
):
379 container
= self
.start_file_streaming(event
, content_length
)
380 _write
= container
.write
381 elif isinstance(event
, Data
):
383 if not event
.more_data
:
384 if isinstance(current_part
, Field
):
385 value
= b
"".join(container
).decode(
386 self
.get_part_charset(current_part
.headers
), "replace"
388 fields
.append((current_part
.name
, value
))
390 container
= t
.cast(t
.IO
[bytes], container
)
397 current_part
.filename
,
399 headers
=current_part
.headers
,
404 event
= parser
.next_event()
406 return self
.cls(fields
), self
.cls(files
)
409 def _chunk_iter(read
: t
.Callable
[[int], bytes], size
: int) -> t
.Iterator
[bytes |
None]:
410 """Read data in chunks for multipart/form-data parsing. Stop if no data is read.
411 Yield ``None`` at the end to signal end of parsing.