1 from __future__
import annotations
7 from collections
import defaultdict
8 from datetime
import datetime
10 from itertools
import chain
11 from random
import random
12 from tempfile
import TemporaryFile
14 from urllib
.parse
import unquote
15 from urllib
.parse
import urlsplit
16 from urllib
.parse
import urlunsplit
18 from ._internal
import _get_environ
19 from ._internal
import _wsgi_decoding_dance
20 from ._internal
import _wsgi_encoding_dance
21 from .datastructures
import Authorization
22 from .datastructures
import CallbackDict
23 from .datastructures
import CombinedMultiDict
24 from .datastructures
import EnvironHeaders
25 from .datastructures
import FileMultiDict
26 from .datastructures
import Headers
27 from .datastructures
import MultiDict
28 from .http
import dump_cookie
29 from .http
import dump_options_header
30 from .http
import parse_cookie
31 from .http
import parse_date
32 from .http
import parse_options_header
33 from .sansio
.multipart
import Data
34 from .sansio
.multipart
import Epilogue
35 from .sansio
.multipart
import Field
36 from .sansio
.multipart
import File
37 from .sansio
.multipart
import MultipartEncoder
38 from .sansio
.multipart
import Preamble
39 from .urls
import _urlencode
40 from .urls
import iri_to_uri
41 from .utils
import cached_property
42 from .utils
import get_content_type
43 from .wrappers
.request
import Request
44 from .wrappers
.response
import Response
45 from .wsgi
import ClosingIterator
46 from .wsgi
import get_current_url
49 from _typeshed
.wsgi
import WSGIApplication
50 from _typeshed
.wsgi
import WSGIEnvironment
51 import typing_extensions
as te
54 def stream_encode_multipart(
55 data
: t
.Mapping
[str, t
.Any
],
56 use_tempfile
: bool = True,
57 threshold
: int = 1024 * 500,
58 boundary
: str |
None = None,
59 ) -> tuple[t
.IO
[bytes], int, str]:
60 """Encode a dict of values (either strings or file descriptors or
61 :class:`FileStorage` objects.) into a multipart encoded string stored
64 .. versionchanged:: 3.0
65 The ``charset`` parameter was removed.
68 boundary
= f
"---------------WerkzeugFormPart_{time()}{random()}"
70 stream
: t
.IO
[bytes] = BytesIO()
73 write_binary
: t
.Callable
[[bytes], int]
77 def write_binary(s
: bytes) -> int:
78 nonlocal stream
, total_length
, on_disk
81 return stream
.write(s
)
85 if length
+ total_length
<= threshold
:
88 new_stream
= t
.cast(t
.IO
[bytes], TemporaryFile("wb+"))
89 new_stream
.write(stream
.getvalue()) # type: ignore
94 total_length
+= length
98 write_binary
= stream
.write
100 encoder
= MultipartEncoder(boundary
.encode())
101 write_binary(encoder
.send_event(Preamble(data
=b
"")))
102 for key
, value
in _iter_data(data
):
103 reader
= getattr(value
, "read", None)
104 if reader
is not None:
105 filename
= getattr(value
, "filename", getattr(value
, "name", None))
106 content_type
= getattr(value
, "content_type", None)
107 if content_type
is None:
110 and mimetypes
.guess_type(filename
)[0]
111 or "application/octet-stream"
113 headers
= value
.headers
114 headers
.update([("Content-Type", content_type
)])
116 write_binary(encoder
.send_event(Field(name
=key
, headers
=headers
)))
120 File(name
=key
, filename
=filename
, headers
=headers
)
124 chunk
= reader(16384)
127 write_binary(encoder
.send_event(Data(data
=chunk
, more_data
=False)))
130 write_binary(encoder
.send_event(Data(data
=chunk
, more_data
=True)))
132 if not isinstance(value
, str):
134 write_binary(encoder
.send_event(Field(name
=key
, headers
=Headers())))
135 write_binary(encoder
.send_event(Data(data
=value
.encode(), more_data
=False)))
137 write_binary(encoder
.send_event(Epilogue(data
=b
"")))
139 length
= stream
.tell()
141 return stream
, length
, boundary
144 def encode_multipart(
145 values
: t
.Mapping
[str, t
.Any
], boundary
: str |
None = None
146 ) -> tuple[str, bytes]:
147 """Like `stream_encode_multipart` but returns a tuple in the form
148 (``boundary``, ``data``) where data is bytes.
150 .. versionchanged:: 3.0
151 The ``charset`` parameter was removed.
153 stream
, length
, boundary
= stream_encode_multipart(
154 values
, use_tempfile
=False, boundary
=boundary
156 return boundary
, stream
.read()
159 def _iter_data(data
: t
.Mapping
[str, t
.Any
]) -> t
.Iterator
[tuple[str, t
.Any
]]:
160 """Iterate over a mapping that might have a list of values, yielding
161 all key, value pairs. Almost like iter_multi_items but only allows
162 lists, not tuples, of values so tuples can be used for files.
164 if isinstance(data
, MultiDict
):
165 yield from data
.items(multi
=True)
167 for key
, value
in data
.items():
168 if isinstance(value
, list):
175 _TAnyMultiDict
= t
.TypeVar("_TAnyMultiDict", bound
=MultiDict
)
178 class EnvironBuilder
:
179 """This class can be used to conveniently create a WSGI environment
180 for testing purposes. It can be used to quickly create WSGI environments
181 or request objects from arbitrary data.
183 The signature of this class is also used in some other places as of
184 Werkzeug 0.5 (:func:`create_environ`, :meth:`Response.from_values`,
185 :meth:`Client.open`). Because of this most of the functionality is
186 available through the constructor alone.
188 Files and regular form data can be manipulated independently of each
189 other with the :attr:`form` and :attr:`files` attributes, but are
190 passed with the same argument to the constructor: `data`.
192 `data` can be any of these values:
194 - a `str` or `bytes` object: The object is converted into an
195 :attr:`input_stream`, the :attr:`content_length` is set and you have to
196 provide a :attr:`content_type`.
197 - a `dict` or :class:`MultiDict`: The keys have to be strings. The values
198 have to be either any of the following objects, or a list of any of the
201 - a :class:`file`-like object: These are converted into
202 :class:`FileStorage` objects automatically.
203 - a `tuple`: The :meth:`~FileMultiDict.add_file` method is called
204 with the key and the unpacked `tuple` items as positional
206 - a `str`: The string is set as form data for the associated key.
207 - a file-like object: The object content is loaded in memory and then
208 handled like a regular `str` or a `bytes`.
210 :param path: the path of the request. In the WSGI environment this will
211 end up as `PATH_INFO`. If the `query_string` is not defined
212 and there is a question mark in the `path` everything after
213 it is used as query string.
214 :param base_url: the base URL is a URL that is used to extract the WSGI
215 URL scheme, host (server name + server port) and the
216 script root (`SCRIPT_NAME`).
217 :param query_string: an optional string or dict with URL parameters.
218 :param method: the HTTP method to use, defaults to `GET`.
219 :param input_stream: an optional input stream. Do not specify this and
220 `data`. As soon as an input stream is set you can't
221 modify :attr:`args` and :attr:`files` unless you
222 set the :attr:`input_stream` to `None` again.
223 :param content_type: The content type for the request. As of 0.5 you
224 don't have to provide this when specifying files
225 and form data via `data`.
226 :param content_length: The content length for the request. You don't
227 have to specify this when providing data via
229 :param errors_stream: an optional error stream that is used for
230 `wsgi.errors`. Defaults to :data:`stderr`.
231 :param multithread: controls `wsgi.multithread`. Defaults to `False`.
232 :param multiprocess: controls `wsgi.multiprocess`. Defaults to `False`.
233 :param run_once: controls `wsgi.run_once`. Defaults to `False`.
234 :param headers: an optional list or :class:`Headers` object of headers.
235 :param data: a string or dict of form data or a file-object.
236 See explanation above.
237 :param json: An object to be serialized and assigned to ``data``.
238 Defaults the content type to ``"application/json"``.
239 Serialized with the function assigned to :attr:`json_dumps`.
240 :param environ_base: an optional dict of environment defaults.
241 :param environ_overrides: an optional dict of environment overrides.
242 :param auth: An authorization object to use for the
243 ``Authorization`` header value. A ``(username, password)`` tuple
244 is a shortcut for ``Basic`` authorization.
246 .. versionchanged:: 3.0
247 The ``charset`` parameter was removed.
249 .. versionchanged:: 2.1
250 ``CONTENT_TYPE`` and ``CONTENT_LENGTH`` are not duplicated as
251 header keys in the environ.
253 .. versionchanged:: 2.0
254 ``REQUEST_URI`` and ``RAW_URI`` is the full raw URI including
255 the query string, not only the path.
257 .. versionchanged:: 2.0
258 The default :attr:`request_class` is ``Request`` instead of
261 .. versionadded:: 2.0
262 Added the ``auth`` parameter.
264 .. versionadded:: 0.15
265 The ``json`` param and :meth:`json_dumps` method.
267 .. versionadded:: 0.15
268 The environ has keys ``REQUEST_URI`` and ``RAW_URI`` containing
269 the path before percent-decoding. This is not part of the WSGI
270 PEP, but many WSGI servers include it.
272 .. versionchanged:: 0.6
273 ``path`` and ``base_url`` can now be unicode strings that are
274 encoded with :func:`iri_to_uri`.
277 #: the server protocol to use. defaults to HTTP/1.1
278 server_protocol
= "HTTP/1.1"
280 #: the wsgi version to use. defaults to (1, 0)
281 wsgi_version
= (1, 0)
283 #: The default request class used by :meth:`get_request`.
284 request_class
= Request
288 #: The serialization function used when ``json`` is passed.
289 json_dumps
= staticmethod(json
.dumps
)
292 _args
: MultiDict |
None
293 _query_string
: str |
None
294 _input_stream
: t
.IO
[bytes] |
None
295 _form
: MultiDict |
None
296 _files
: FileMultiDict |
None
301 base_url
: str |
None = None,
302 query_string
: t
.Mapping
[str, str] |
str |
None = None,
304 input_stream
: t
.IO
[bytes] |
None = None,
305 content_type
: str |
None = None,
306 content_length
: int |
None = None,
307 errors_stream
: t
.IO
[str] |
None = None,
308 multithread
: bool = False,
309 multiprocess
: bool = False,
310 run_once
: bool = False,
311 headers
: Headers | t
.Iterable
[tuple[str, str]] |
None = None,
312 data
: None |
(t
.IO
[bytes] |
str |
bytes | t
.Mapping
[str, t
.Any
]) = None,
313 environ_base
: t
.Mapping
[str, t
.Any
] |
None = None,
314 environ_overrides
: t
.Mapping
[str, t
.Any
] |
None = None,
315 mimetype
: str |
None = None,
316 json
: t
.Mapping
[str, t
.Any
] |
None = None,
317 auth
: Authorization |
tuple[str, str] |
None = None,
319 if query_string
is not None and "?" in path
:
320 raise ValueError("Query string is defined in the path and as an argument")
321 request_uri
= urlsplit(path
)
322 if query_string
is None and "?" in path
:
323 query_string
= request_uri
.query
325 self
.path
= iri_to_uri(request_uri
.path
)
326 self
.request_uri
= path
327 if base_url
is not None:
328 base_url
= iri_to_uri(base_url
)
329 self
.base_url
= base_url
# type: ignore
330 if isinstance(query_string
, str):
331 self
.query_string
= query_string
333 if query_string
is None:
334 query_string
= MultiDict()
335 elif not isinstance(query_string
, MultiDict
):
336 query_string
= MultiDict(query_string
)
337 self
.args
= query_string
341 elif not isinstance(headers
, Headers
):
342 headers
= Headers(headers
)
343 self
.headers
= headers
344 if content_type
is not None:
345 self
.content_type
= content_type
346 if errors_stream
is None:
347 errors_stream
= sys
.stderr
348 self
.errors_stream
= errors_stream
349 self
.multithread
= multithread
350 self
.multiprocess
= multiprocess
351 self
.run_once
= run_once
352 self
.environ_base
= environ_base
353 self
.environ_overrides
= environ_overrides
354 self
.input_stream
= input_stream
355 self
.content_length
= content_length
359 if isinstance(auth
, tuple):
360 auth
= Authorization(
361 "basic", {"username": auth[0], "password": auth[1]}
364 self
.headers
.set("Authorization", auth
.to_header())
368 raise TypeError("can't provide both json and data")
370 data
= self
.json_dumps(json
)
372 if self
.content_type
is None:
373 self
.content_type
= "application/json"
376 if input_stream
is not None:
377 raise TypeError("can't provide input stream and data")
378 if hasattr(data
, "read"):
380 if isinstance(data
, str):
382 if isinstance(data
, bytes):
383 self
.input_stream
= BytesIO(data
)
384 if self
.content_length
is None:
385 self
.content_length
= len(data
)
387 for key
, value
in _iter_data(data
):
388 if isinstance(value
, (tuple, dict)) or hasattr(value
, "read"):
389 self
._add
_file
_from
_data
(key
, value
)
391 self
.form
.setlistdefault(key
).append(value
)
393 if mimetype
is not None:
394 self
.mimetype
= mimetype
397 def from_environ(cls
, environ
: WSGIEnvironment
, **kwargs
: t
.Any
) -> EnvironBuilder
:
398 """Turn an environ dict back into a builder. Any extra kwargs
399 override the args extracted from the environ.
401 .. versionchanged:: 2.0
402 Path and query values are passed through the WSGI decoding
403 dance to avoid double encoding.
405 .. versionadded:: 0.15
407 headers
= Headers(EnvironHeaders(environ
))
409 "path": _wsgi_decoding_dance(environ
["PATH_INFO"]),
410 "base_url": cls
._make
_base
_url
(
411 environ
["wsgi.url_scheme"],
413 _wsgi_decoding_dance(environ
["SCRIPT_NAME"]),
415 "query_string": _wsgi_decoding_dance(environ
["QUERY_STRING"]),
416 "method": environ
["REQUEST_METHOD"],
417 "input_stream": environ
["wsgi.input"],
418 "content_type": headers
.pop("Content-Type", None),
419 "content_length": headers
.pop("Content-Length", None),
420 "errors_stream": environ
["wsgi.errors"],
421 "multithread": environ
["wsgi.multithread"],
422 "multiprocess": environ
["wsgi.multiprocess"],
423 "run_once": environ
["wsgi.run_once"],
429 def _add_file_from_data(
432 value
: (t
.IO
[bytes] |
tuple[t
.IO
[bytes], str] |
tuple[t
.IO
[bytes], str, str]),
434 """Called in the EnvironBuilder to add files from the data dict."""
435 if isinstance(value
, tuple):
436 self
.files
.add_file(key
, *value
)
438 self
.files
.add_file(key
, value
)
441 def _make_base_url(scheme
: str, host
: str, script_root
: str) -> str:
442 return urlunsplit((scheme
, host
, script_root
, "", "")).rstrip("/") + "/"
445 def base_url(self
) -> str:
446 """The base URL is used to extract the URL scheme, host name,
449 return self
._make
_base
_url
(self
.url_scheme
, self
.host
, self
.script_root
)
452 def base_url(self
, value
: str |
None) -> None:
458 scheme
, netloc
, script_root
, qs
, anchor
= urlsplit(value
)
460 raise ValueError("base url must not contain a query string or fragment")
461 self
.script_root
= script_root
.rstrip("/")
463 self
.url_scheme
= scheme
466 def content_type(self
) -> str |
None:
467 """The content type for the request. Reflected from and to
468 the :attr:`headers`. Do not set if you set :attr:`files` or
469 :attr:`form` for auto detection.
471 ct
= self
.headers
.get("Content-Type")
472 if ct
is None and not self
._input
_stream
:
474 return "multipart/form-data"
476 return "application/x-www-form-urlencoded"
481 def content_type(self
, value
: str |
None) -> None:
483 self
.headers
.pop("Content-Type", None)
485 self
.headers
["Content-Type"] = value
488 def mimetype(self
) -> str |
None:
489 """The mimetype (content type without charset etc.)
491 .. versionadded:: 0.14
493 ct
= self
.content_type
494 return ct
.split(";")[0].strip() if ct
else None
497 def mimetype(self
, value
: str) -> None:
498 self
.content_type
= get_content_type(value
, "utf-8")
501 def mimetype_params(self
) -> t
.Mapping
[str, str]:
502 """The mimetype parameters as dict. For example if the
503 content type is ``text/html; charset=utf-8`` the params would be
504 ``{'charset': 'utf-8'}``.
506 .. versionadded:: 0.14
509 def on_update(d
: CallbackDict
) -> None:
510 self
.headers
["Content-Type"] = dump_options_header(self
.mimetype
, d
)
512 d
= parse_options_header(self
.headers
.get("content-type", ""))[1]
513 return CallbackDict(d
, on_update
)
516 def content_length(self
) -> int |
None:
517 """The content length as integer. Reflected from and to the
518 :attr:`headers`. Do not set if you set :attr:`files` or
519 :attr:`form` for auto detection.
521 return self
.headers
.get("Content-Length", type=int)
523 @content_length.setter
524 def content_length(self
, value
: int |
None) -> None:
526 self
.headers
.pop("Content-Length", None)
528 self
.headers
["Content-Length"] = str(value
)
530 def _get_form(self
, name
: str, storage
: type[_TAnyMultiDict
]) -> _TAnyMultiDict
:
531 """Common behavior for getting the :attr:`form` and
532 :attr:`files` properties.
534 :param name: Name of the internal cached attribute.
535 :param storage: Storage class used for the data.
537 if self
.input_stream
is not None:
538 raise AttributeError("an input stream is defined")
540 rv
= getattr(self
, name
)
544 setattr(self
, name
, rv
)
546 return rv
# type: ignore
548 def _set_form(self
, name
: str, value
: MultiDict
) -> None:
549 """Common behavior for setting the :attr:`form` and
550 :attr:`files` properties.
552 :param name: Name of the internal cached attribute.
553 :param value: Value to assign to the attribute.
555 self
._input
_stream
= None
556 setattr(self
, name
, value
)
559 def form(self
) -> MultiDict
:
560 """A :class:`MultiDict` of form values."""
561 return self
._get
_form
("_form", MultiDict
)
564 def form(self
, value
: MultiDict
) -> None:
565 self
._set
_form
("_form", value
)
568 def files(self
) -> FileMultiDict
:
569 """A :class:`FileMultiDict` of uploaded files. Use
570 :meth:`~FileMultiDict.add_file` to add new files.
572 return self
._get
_form
("_files", FileMultiDict
)
575 def files(self
, value
: FileMultiDict
) -> None:
576 self
._set
_form
("_files", value
)
579 def input_stream(self
) -> t
.IO
[bytes] |
None:
580 """An optional input stream. This is mutually exclusive with
581 setting :attr:`form` and :attr:`files`, setting it will clear
582 those. Do not provide this if the method is not ``POST`` or
583 another method that has a body.
585 return self
._input
_stream
588 def input_stream(self
, value
: t
.IO
[bytes] |
None) -> None:
589 self
._input
_stream
= value
594 def query_string(self
) -> str:
595 """The query string. If you set this to a string
596 :attr:`args` will no longer be available.
598 if self
._query
_string
is None:
599 if self
._args
is not None:
600 return _urlencode(self
._args
)
602 return self
._query
_string
605 def query_string(self
, value
: str |
None) -> None:
606 self
._query
_string
= value
610 def args(self
) -> MultiDict
:
611 """The URL arguments as :class:`MultiDict`."""
612 if self
._query
_string
is not None:
613 raise AttributeError("a query string is defined")
614 if self
._args
is None:
615 self
._args
= MultiDict()
619 def args(self
, value
: MultiDict |
None) -> None:
620 self
._query
_string
= None
624 def server_name(self
) -> str:
625 """The server name (read-only, use :attr:`host` to set)"""
626 return self
.host
.split(":", 1)[0]
629 def server_port(self
) -> int:
630 """The server port as integer (read-only, use :attr:`host` to set)"""
631 pieces
= self
.host
.split(":", 1)
635 return int(pieces
[1])
639 if self
.url_scheme
== "https":
643 def __del__(self
) -> None:
649 def close(self
) -> None:
650 """Closes all files. If you put real :class:`file` objects into the
651 :attr:`files` dict you can call this method to automatically close
657 files
= self
.files
.values()
658 except AttributeError:
659 files
= () # type: ignore
667 def get_environ(self
) -> WSGIEnvironment
:
668 """Return the built environ.
670 .. versionchanged:: 0.15
671 The content type and length headers are set based on
672 input stream detection. Previously this only set the WSGI
675 input_stream
= self
.input_stream
676 content_length
= self
.content_length
678 mimetype
= self
.mimetype
679 content_type
= self
.content_type
681 if input_stream
is not None:
682 start_pos
= input_stream
.tell()
683 input_stream
.seek(0, 2)
684 end_pos
= input_stream
.tell()
685 input_stream
.seek(start_pos
)
686 content_length
= end_pos
- start_pos
687 elif mimetype
== "multipart/form-data":
688 input_stream
, content_length
, boundary
= stream_encode_multipart(
689 CombinedMultiDict([self
.form
, self
.files
])
691 content_type
= f
'{mimetype}; boundary="{boundary}"'
692 elif mimetype
== "application/x-www-form-urlencoded":
693 form_encoded
= _urlencode(self
.form
).encode("ascii")
694 content_length
= len(form_encoded
)
695 input_stream
= BytesIO(form_encoded
)
697 input_stream
= BytesIO()
699 result
: WSGIEnvironment
= {}
700 if self
.environ_base
:
701 result
.update(self
.environ_base
)
703 def _path_encode(x
: str) -> str:
704 return _wsgi_encoding_dance(unquote(x
))
706 raw_uri
= _wsgi_encoding_dance(self
.request_uri
)
709 "REQUEST_METHOD": self
.method
,
710 "SCRIPT_NAME": _path_encode(self
.script_root
),
711 "PATH_INFO": _path_encode(self
.path
),
712 "QUERY_STRING": _wsgi_encoding_dance(self
.query_string
),
713 # Non-standard, added by mod_wsgi, uWSGI
714 "REQUEST_URI": raw_uri
,
715 # Non-standard, added by gunicorn
717 "SERVER_NAME": self
.server_name
,
718 "SERVER_PORT": str(self
.server_port
),
719 "HTTP_HOST": self
.host
,
720 "SERVER_PROTOCOL": self
.server_protocol
,
721 "wsgi.version": self
.wsgi_version
,
722 "wsgi.url_scheme": self
.url_scheme
,
723 "wsgi.input": input_stream
,
724 "wsgi.errors": self
.errors_stream
,
725 "wsgi.multithread": self
.multithread
,
726 "wsgi.multiprocess": self
.multiprocess
,
727 "wsgi.run_once": self
.run_once
,
731 headers
= self
.headers
.copy()
732 # Don't send these as headers, they're part of the environ.
733 headers
.remove("Content-Type")
734 headers
.remove("Content-Length")
736 if content_type
is not None:
737 result
["CONTENT_TYPE"] = content_type
739 if content_length
is not None:
740 result
["CONTENT_LENGTH"] = str(content_length
)
742 combined_headers
= defaultdict(list)
744 for key
, value
in headers
.to_wsgi_list():
745 combined_headers
[f
"HTTP_{key.upper().replace('-', '_')}"].append(value
)
747 for key
, values
in combined_headers
.items():
748 result
[key
] = ", ".join(values
)
750 if self
.environ_overrides
:
751 result
.update(self
.environ_overrides
)
755 def get_request(self
, cls
: type[Request
] |
None = None) -> Request
:
756 """Returns a request with the data. If the request class is not
757 specified :attr:`request_class` is used.
759 :param cls: The request wrapper to use.
762 cls
= self
.request_class
764 return cls(self
.get_environ())
767 class ClientRedirectError(Exception):
768 """If a redirect loop is detected when using follow_redirects=True with
769 the :cls:`Client`, then this exception is raised.
774 """Simulate sending requests to a WSGI application without running a WSGI or HTTP
777 :param application: The WSGI application to make requests to.
778 :param response_wrapper: A :class:`.Response` class to wrap response data with.
779 Defaults to :class:`.TestResponse`. If it's not a subclass of ``TestResponse``,
781 :param use_cookies: Persist cookies from ``Set-Cookie`` response headers to the
782 ``Cookie`` header in subsequent requests. Domain and path matching is supported,
783 but other cookie parameters are ignored.
784 :param allow_subdomain_redirects: Allow requests to follow redirects to subdomains.
785 Enable this if the application handles subdomains and redirects between them.
787 .. versionchanged:: 2.3
788 Simplify cookie implementation, support domain and path matching.
790 .. versionchanged:: 2.1
791 All data is available as properties on the returned response object. The
792 response cannot be returned as a tuple.
794 .. versionchanged:: 2.0
795 ``response_wrapper`` is always a subclass of :class:``TestResponse``.
797 .. versionchanged:: 0.5
798 Added the ``use_cookies`` parameter.
803 application
: WSGIApplication
,
804 response_wrapper
: type[Response
] |
None = None,
805 use_cookies
: bool = True,
806 allow_subdomain_redirects
: bool = False,
808 self
.application
= application
810 if response_wrapper
in {None, Response}
:
811 response_wrapper
= TestResponse
812 elif not isinstance(response_wrapper
, TestResponse
):
813 response_wrapper
= type(
814 "WrapperTestResponse",
815 (TestResponse
, response_wrapper
), # type: ignore
819 self
.response_wrapper
= t
.cast(t
.Type
["TestResponse"], response_wrapper
)
822 self
._cookies
: dict[tuple[str, str, str], Cookie
] |
None = {}
826 self
.allow_subdomain_redirects
= allow_subdomain_redirects
829 self
, key
: str, domain
: str = "localhost", path
: str = "/"
831 """Return a :class:`.Cookie` if it exists. Cookies are uniquely identified by
832 ``(domain, path, key)``.
834 :param key: The decoded form of the key for the cookie.
835 :param domain: The domain the cookie was set for.
836 :param path: The path the cookie was set for.
838 .. versionadded:: 2.3
840 if self
._cookies
is None:
842 "Cookies are disabled. Create a client with 'use_cookies=True'."
845 return self
._cookies
.get((domain
, path
, key
))
852 domain
: str = "localhost",
853 origin_only
: bool = True,
857 """Set a cookie to be sent in subsequent requests.
859 This is a convenience to skip making a test request to a route that would set
860 the cookie. To test the cookie, make a test request to a route that uses the
863 The client uses ``domain``, ``origin_only``, and ``path`` to determine which
864 cookies to send with a request. It does not use other cookie parameters that
865 browsers use, since they're not applicable in tests.
867 :param key: The key part of the cookie.
868 :param value: The value part of the cookie.
869 :param domain: Send this cookie with requests that match this domain. If
870 ``origin_only`` is true, it must be an exact match, otherwise it may be a
872 :param origin_only: Whether the domain must be an exact match to the request.
873 :param path: Send this cookie with requests that match this path either exactly
875 :param kwargs: Passed to :func:`.dump_cookie`.
877 .. versionchanged:: 3.0
878 The parameter ``server_name`` is removed. The first parameter is
879 ``key``. Use the ``domain`` and ``origin_only`` parameters instead.
881 .. versionchanged:: 2.3
882 The ``origin_only`` parameter was added.
884 .. versionchanged:: 2.3
885 The ``domain`` parameter defaults to ``localhost``.
887 if self
._cookies
is None:
889 "Cookies are disabled. Create a client with 'use_cookies=True'."
892 cookie
= Cookie
._from
_response
_header
(
893 domain
, "/", dump_cookie(key
, value
, domain
=domain
, path
=path
, **kwargs
)
895 cookie
.origin_only
= origin_only
897 if cookie
._should
_delete
:
898 self
._cookies
.pop(cookie
._storage
_key
, None)
900 self
._cookies
[cookie
._storage
_key
] = cookie
906 domain
: str = "localhost",
909 """Delete a cookie if it exists. Cookies are uniquely identified by
910 ``(domain, path, key)``.
912 :param key: The decoded form of the key for the cookie.
913 :param domain: The domain the cookie was set for.
914 :param path: The path the cookie was set for.
916 .. versionchanged:: 3.0
917 The ``server_name`` parameter is removed. The first parameter is
918 ``key``. Use the ``domain`` parameter instead.
920 .. versionchanged:: 3.0
921 The ``secure``, ``httponly`` and ``samesite`` parameters are removed.
923 .. versionchanged:: 2.3
924 The ``domain`` parameter defaults to ``localhost``.
926 if self
._cookies
is None:
928 "Cookies are disabled. Create a client with 'use_cookies=True'."
931 self
._cookies
.pop((domain
, path
, key
), None)
933 def _add_cookies_to_wsgi(self
, environ
: WSGIEnvironment
) -> None:
934 """If cookies are enabled, set the ``Cookie`` header in the environ to the
935 cookies that are applicable to the request host and path.
939 .. versionadded:: 2.3
941 if self
._cookies
is None:
944 url
= urlsplit(get_current_url(environ
))
945 server_name
= url
.hostname
or "localhost"
947 c
._to
_request
_header
()
948 for c
in self
._cookies
.values()
949 if c
._matches
_request
(server_name
, url
.path
)
953 environ
["HTTP_COOKIE"] = value
955 environ
.pop("HTTP_COOKIE", None)
957 def _update_cookies_from_response(
958 self
, server_name
: str, path
: str, headers
: list[str]
960 """If cookies are enabled, update the stored cookies from any ``Set-Cookie``
961 headers in the response.
965 .. versionadded:: 2.3
967 if self
._cookies
is None:
970 for header
in headers
:
971 cookie
= Cookie
._from
_response
_header
(server_name
, path
, header
)
973 if cookie
._should
_delete
:
974 self
._cookies
.pop(cookie
._storage
_key
, None)
976 self
._cookies
[cookie
._storage
_key
] = cookie
979 self
, environ
: WSGIEnvironment
, buffered
: bool = False
980 ) -> tuple[t
.Iterable
[bytes], str, Headers
]:
981 """Runs the wrapped WSGI app with the given environment.
985 self
._add
_cookies
_to
_wsgi
(environ
)
986 rv
= run_wsgi_app(self
.application
, environ
, buffered
=buffered
)
987 url
= urlsplit(get_current_url(environ
))
988 self
._update
_cookies
_from
_response
(
989 url
.hostname
or "localhost", url
.path
, rv
[2].getlist("Set-Cookie")
993 def resolve_redirect(
994 self
, response
: TestResponse
, buffered
: bool = False
996 """Perform a new request to the location given by the redirect
997 response to the previous request.
1001 scheme
, netloc
, path
, qs
, anchor
= urlsplit(response
.location
)
1002 builder
= EnvironBuilder
.from_environ(
1003 response
.request
.environ
, path
=path
, query_string
=qs
1006 to_name_parts
= netloc
.split(":", 1)[0].split(".")
1007 from_name_parts
= builder
.server_name
.split(".")
1009 if to_name_parts
!= [""]:
1010 # The new location has a host, use it for the base URL.
1011 builder
.url_scheme
= scheme
1012 builder
.host
= netloc
1014 # A local redirect with autocorrect_location_header=False
1015 # doesn't have a host, so use the request's host.
1016 to_name_parts
= from_name_parts
1018 # Explain why a redirect to a different server name won't be followed.
1019 if to_name_parts
!= from_name_parts
:
1020 if to_name_parts
[-len(from_name_parts
) :] == from_name_parts
:
1021 if not self
.allow_subdomain_redirects
:
1022 raise RuntimeError("Following subdomain redirects is not enabled.")
1024 raise RuntimeError("Following external redirects is not supported.")
1026 path_parts
= path
.split("/")
1027 root_parts
= builder
.script_root
.split("/")
1029 if path_parts
[: len(root_parts
)] == root_parts
:
1030 # Strip the script root from the path.
1031 builder
.path
= path
[len(builder
.script_root
) :]
1033 # The new location is not under the script root, so use the
1034 # whole path and clear the previous root.
1036 builder
.script_root
= ""
1038 # Only 307 and 308 preserve all of the original request.
1039 if response
.status_code
not in {307, 308}
:
1040 # HEAD is preserved, everything else becomes GET.
1041 if builder
.method
!= "HEAD":
1042 builder
.method
= "GET"
1044 # Clear the body and the headers that describe it.
1046 if builder
.input_stream
is not None:
1047 builder
.input_stream
.close()
1048 builder
.input_stream
= None
1050 builder
.content_type
= None
1051 builder
.content_length
= None
1052 builder
.headers
.pop("Transfer-Encoding", None)
1054 return self
.open(builder
, buffered
=buffered
)
1059 buffered
: bool = False,
1060 follow_redirects
: bool = False,
1063 """Generate an environ dict from the given arguments, make a
1064 request to the application using it, and return the response.
1066 :param args: Passed to :class:`EnvironBuilder` to create the
1067 environ for the request. If a single arg is passed, it can
1068 be an existing :class:`EnvironBuilder` or an environ dict.
1069 :param buffered: Convert the iterator returned by the app into
1070 a list. If the iterator has a ``close()`` method, it is
1071 called automatically.
1072 :param follow_redirects: Make additional requests to follow HTTP
1073 redirects until a non-redirect status is returned.
1074 :attr:`TestResponse.history` lists the intermediate
1077 .. versionchanged:: 2.1
1078 Removed the ``as_tuple`` parameter.
1080 .. versionchanged:: 2.0
1081 The request input stream is closed when calling
1082 ``response.close()``. Input streams for redirects are
1083 automatically closed.
1085 .. versionchanged:: 0.5
1086 If a dict is provided as file in the dict for the ``data``
1087 parameter the content type has to be called ``content_type``
1088 instead of ``mimetype``. This change was made for
1089 consistency with :class:`werkzeug.FileWrapper`.
1091 .. versionchanged:: 0.5
1092 Added the ``follow_redirects`` parameter.
1094 request
: Request |
None = None
1096 if not kwargs
and len(args
) == 1:
1099 if isinstance(arg
, EnvironBuilder
):
1100 request
= arg
.get_request()
1101 elif isinstance(arg
, dict):
1102 request
= EnvironBuilder
.from_environ(arg
).get_request()
1103 elif isinstance(arg
, Request
):
1107 builder
= EnvironBuilder(*args
, **kwargs
)
1110 request
= builder
.get_request()
1114 response
= self
.run_wsgi_app(request
.environ
, buffered
=buffered
)
1115 response
= self
.response_wrapper(*response
, request
=request
)
1118 history
: list[TestResponse
] = []
1120 if not follow_redirects
:
1123 while response
.status_code
in {
1131 # Exhaust intermediate response bodies to ensure middleware
1132 # that returns an iterator runs any cleanup code.
1134 response
.make_sequence()
1137 new_redirect_entry
= (response
.location
, response
.status_code
)
1139 if new_redirect_entry
in redirects
:
1140 raise ClientRedirectError(
1141 f
"Loop detected: A {response.status_code} redirect"
1142 f
" to {response.location} was already made."
1145 redirects
.add(new_redirect_entry
)
1146 response
.history
= tuple(history
)
1147 history
.append(response
)
1148 response
= self
.resolve_redirect(response
, buffered
=buffered
)
1150 # This is the final request after redirects.
1151 response
.history
= tuple(history
)
1152 # Close the input stream when closing the response, in case
1153 # the input is an open temporary file.
1154 response
.call_on_close(request
.input_stream
.close
)
1157 def get(self
, *args
: t
.Any
, **kw
: t
.Any
) -> TestResponse
:
1158 """Call :meth:`open` with ``method`` set to ``GET``."""
1159 kw
["method"] = "GET"
1160 return self
.open(*args
, **kw
)
1162 def post(self
, *args
: t
.Any
, **kw
: t
.Any
) -> TestResponse
:
1163 """Call :meth:`open` with ``method`` set to ``POST``."""
1164 kw
["method"] = "POST"
1165 return self
.open(*args
, **kw
)
1167 def put(self
, *args
: t
.Any
, **kw
: t
.Any
) -> TestResponse
:
1168 """Call :meth:`open` with ``method`` set to ``PUT``."""
1169 kw
["method"] = "PUT"
1170 return self
.open(*args
, **kw
)
1172 def delete(self
, *args
: t
.Any
, **kw
: t
.Any
) -> TestResponse
:
1173 """Call :meth:`open` with ``method`` set to ``DELETE``."""
1174 kw
["method"] = "DELETE"
1175 return self
.open(*args
, **kw
)
1177 def patch(self
, *args
: t
.Any
, **kw
: t
.Any
) -> TestResponse
:
1178 """Call :meth:`open` with ``method`` set to ``PATCH``."""
1179 kw
["method"] = "PATCH"
1180 return self
.open(*args
, **kw
)
1182 def options(self
, *args
: t
.Any
, **kw
: t
.Any
) -> TestResponse
:
1183 """Call :meth:`open` with ``method`` set to ``OPTIONS``."""
1184 kw
["method"] = "OPTIONS"
1185 return self
.open(*args
, **kw
)
1187 def head(self
, *args
: t
.Any
, **kw
: t
.Any
) -> TestResponse
:
1188 """Call :meth:`open` with ``method`` set to ``HEAD``."""
1189 kw
["method"] = "HEAD"
1190 return self
.open(*args
, **kw
)
1192 def trace(self
, *args
: t
.Any
, **kw
: t
.Any
) -> TestResponse
:
1193 """Call :meth:`open` with ``method`` set to ``TRACE``."""
1194 kw
["method"] = "TRACE"
1195 return self
.open(*args
, **kw
)
1197 def __repr__(self
) -> str:
1198 return f
"<{type(self).__name__} {self.application!r}>"
1201 def create_environ(*args
: t
.Any
, **kwargs
: t
.Any
) -> WSGIEnvironment
:
1202 """Create a new WSGI environ dict based on the values passed. The first
1203 parameter should be the path of the request which defaults to '/'. The
1204 second one can either be an absolute path (in that case the host is
1205 localhost:80) or a full path to the request with scheme, netloc port and
1206 the path to the script.
1208 This accepts the same arguments as the :class:`EnvironBuilder`
1211 .. versionchanged:: 0.5
1212 This function is now a thin wrapper over :class:`EnvironBuilder` which
1213 was added in 0.5. The `headers`, `environ_base`, `environ_overrides`
1214 and `charset` parameters were added.
1216 builder
= EnvironBuilder(*args
, **kwargs
)
1219 return builder
.get_environ()
1225 app
: WSGIApplication
, environ
: WSGIEnvironment
, buffered
: bool = False
1226 ) -> tuple[t
.Iterable
[bytes], str, Headers
]:
1227 """Return a tuple in the form (app_iter, status, headers) of the
1228 application output. This works best if you pass it an application that
1229 returns an iterator all the time.
1231 Sometimes applications may use the `write()` callable returned
1232 by the `start_response` function. This tries to resolve such edge
1233 cases automatically. But if you don't get the expected output you
1234 should set `buffered` to `True` which enforces buffering.
1236 If passed an invalid WSGI application the behavior of this function is
1237 undefined. Never pass non-conforming WSGI applications to this function.
1239 :param app: the application to execute.
1240 :param buffered: set to `True` to enforce buffering.
1241 :return: tuple in the form ``(app_iter, status, headers)``
1243 # Copy environ to ensure any mutations by the app (ProxyFix, for
1244 # example) don't affect subsequent requests (such as redirects).
1245 environ
= _get_environ(environ
).copy()
1247 response
: tuple[str, list[tuple[str, str]]] |
None = None
1248 buffer: list[bytes] = []
1250 def start_response(status
, headers
, exc_info
=None): # type: ignore
1255 raise exc_info
[1].with_traceback(exc_info
[2])
1259 response
= (status
, headers
)
1260 return buffer.append
1262 app_rv
= app(environ
, start_response
)
1263 close_func
= getattr(app_rv
, "close", None)
1264 app_iter
: t
.Iterable
[bytes] = iter(app_rv
)
1266 # when buffering we emit the close call early and convert the
1267 # application iterator into a regular list
1270 app_iter
= list(app_iter
)
1272 if close_func
is not None:
1275 # otherwise we iterate the application iter until we have a response, chain
1276 # the already received data with the already collected data and wrap it in
1277 # a new `ClosingIterator` if we need to restore a `close` callable from the
1278 # original return value.
1280 for item
in app_iter
:
1283 if response
is not None:
1287 app_iter
= chain(buffer, app_iter
)
1289 if close_func
is not None and app_iter
is not app_rv
:
1290 app_iter
= ClosingIterator(app_iter
, close_func
)
1292 status
, headers
= response
# type: ignore
1293 return app_iter
, status
, Headers(headers
)
1296 class TestResponse(Response
):
1297 """:class:`~werkzeug.wrappers.Response` subclass that provides extra
1298 information about requests made with the test :class:`Client`.
1300 Test client requests will always return an instance of this class.
1301 If a custom response class is passed to the client, it is
1302 subclassed along with this to support test information.
1304 If the test request included large files, or if the application is
1305 serving a file, call :meth:`close` to close any open files and
1306 prevent Python showing a ``ResourceWarning``.
1308 .. versionchanged:: 2.2
1309 Set the ``default_mimetype`` to None to prevent a mimetype being
1312 .. versionchanged:: 2.1
1313 Response instances cannot be treated as tuples.
1315 .. versionadded:: 2.0
1316 Test client methods always return instances of this class.
1319 default_mimetype
= None
1320 # Don't assume a mimetype, instead use whatever the response provides
1323 """A request object with the environ used to make the request that
1324 resulted in this response.
1327 history
: tuple[TestResponse
, ...]
1328 """A list of intermediate responses. Populated when the test request
1329 is made with ``follow_redirects`` enabled.
1332 # Tell Pytest to ignore this, it's not a test class.
1337 response
: t
.Iterable
[bytes],
1341 history
: tuple[TestResponse
] = (), # type: ignore
1344 super().__init
__(response
, status
, headers
, **kwargs
)
1345 self
.request
= request
1346 self
.history
= history
1347 self
._compat
_tuple
= response
, status
, headers
1350 def text(self
) -> str:
1351 """The response data as text. A shortcut for
1352 ``response.get_data(as_text=True)``.
1354 .. versionadded:: 2.1
1356 return self
.get_data(as_text
=True)
1359 @dataclasses.dataclass
1361 """A cookie key, value, and parameters.
1363 The class itself is not a public API. Its attributes are documented for inspection
1364 with :meth:`.Client.get_cookie` only.
1366 .. versionadded:: 2.3
1370 """The cookie key, encoded as a client would see it."""
1373 """The cookie key, encoded as a client would see it."""
1376 """The cookie key, decoded as the application would set and see it."""
1379 """The cookie value, decoded as the application would set and see it."""
1381 expires
: datetime |
None
1382 """The time at which the cookie is no longer valid."""
1385 """The number of seconds from when the cookie was set at which it is
1390 """The domain that the cookie was set for, or the request domain if not set."""
1393 """Whether the cookie will be sent for exact domain matches only. This is ``True``
1394 if the ``Domain`` parameter was not present.
1398 """The path that the cookie was set for."""
1401 """The ``Secure`` parameter."""
1403 http_only
: bool |
None
1404 """The ``HttpOnly`` parameter."""
1406 same_site
: str |
None
1407 """The ``SameSite`` parameter."""
1409 def _matches_request(self
, server_name
: str, path
: str) -> bool:
1411 server_name
== self
.domain
1413 not self
.origin_only
1414 and server_name
.endswith(self
.domain
)
1415 and server_name
[: -len(self
.domain
)].endswith(".")
1420 path
.startswith(self
.path
)
1421 and path
[len(self
.path
) - self
.path
.endswith("/") :].startswith("/")
1425 def _to_request_header(self
) -> str:
1426 return f
"{self.key}={self.value}"
1429 def _from_response_header(cls
, server_name
: str, path
: str, header
: str) -> te
.Self
:
1430 header
, _
, parameters_str
= header
.partition(";")
1431 key
, _
, value
= header
.partition("=")
1432 decoded_key
, decoded_value
= next(parse_cookie(header
).items())
1435 for item
in parameters_str
.split(";"):
1436 k
, sep
, v
= item
.partition("=")
1437 params
[k
.strip().lower()] = v
.strip() if sep
else None
1441 value
=value
.strip(),
1442 decoded_key
=decoded_key
,
1443 decoded_value
=decoded_value
,
1444 expires
=parse_date(params
.get("expires")),
1445 max_age
=int(params
["max-age"] or 0) if "max-age" in params
else None,
1446 domain
=params
.get("domain") or server_name
,
1447 origin_only
="domain" not in params
,
1448 path
=params
.get("path") or path
.rpartition("/")[0] or "/",
1449 secure
="secure" in params
,
1450 http_only
="httponly" in params
,
1451 same_site
=params
.get("samesite"),
1455 def _storage_key(self
) -> tuple[str, str, str]:
1456 return self
.domain
, self
.path
, self
.decoded_key
1459 def _should_delete(self
) -> bool:
1460 return self
.max_age
== 0 or (
1461 self
.expires
is not None and self
.expires
.timestamp() == 0