]> jfr.im git - dlqueue.git/blob - venv/lib/python3.11/site-packages/werkzeug/test.py
init: venv aand flask
[dlqueue.git] / venv / lib / python3.11 / site-packages / werkzeug / test.py
1 from __future__ import annotations
2
3 import dataclasses
4 import mimetypes
5 import sys
6 import typing as t
7 from collections import defaultdict
8 from datetime import datetime
9 from io import BytesIO
10 from itertools import chain
11 from random import random
12 from tempfile import TemporaryFile
13 from time import time
14 from urllib.parse import unquote
15 from urllib.parse import urlsplit
16 from urllib.parse import urlunsplit
17
18 from ._internal import _get_environ
19 from ._internal import _wsgi_decoding_dance
20 from ._internal import _wsgi_encoding_dance
21 from .datastructures import Authorization
22 from .datastructures import CallbackDict
23 from .datastructures import CombinedMultiDict
24 from .datastructures import EnvironHeaders
25 from .datastructures import FileMultiDict
26 from .datastructures import Headers
27 from .datastructures import MultiDict
28 from .http import dump_cookie
29 from .http import dump_options_header
30 from .http import parse_cookie
31 from .http import parse_date
32 from .http import parse_options_header
33 from .sansio.multipart import Data
34 from .sansio.multipart import Epilogue
35 from .sansio.multipart import Field
36 from .sansio.multipart import File
37 from .sansio.multipart import MultipartEncoder
38 from .sansio.multipart import Preamble
39 from .urls import _urlencode
40 from .urls import iri_to_uri
41 from .utils import cached_property
42 from .utils import get_content_type
43 from .wrappers.request import Request
44 from .wrappers.response import Response
45 from .wsgi import ClosingIterator
46 from .wsgi import get_current_url
47
48 if t.TYPE_CHECKING:
49 from _typeshed.wsgi import WSGIApplication
50 from _typeshed.wsgi import WSGIEnvironment
51 import typing_extensions as te
52
53
54 def stream_encode_multipart(
55 data: t.Mapping[str, t.Any],
56 use_tempfile: bool = True,
57 threshold: int = 1024 * 500,
58 boundary: str | None = None,
59 ) -> tuple[t.IO[bytes], int, str]:
60 """Encode a dict of values (either strings or file descriptors or
61 :class:`FileStorage` objects.) into a multipart encoded string stored
62 in a file descriptor.
63
64 .. versionchanged:: 3.0
65 The ``charset`` parameter was removed.
66 """
67 if boundary is None:
68 boundary = f"---------------WerkzeugFormPart_{time()}{random()}"
69
70 stream: t.IO[bytes] = BytesIO()
71 total_length = 0
72 on_disk = False
73 write_binary: t.Callable[[bytes], int]
74
75 if use_tempfile:
76
77 def write_binary(s: bytes) -> int:
78 nonlocal stream, total_length, on_disk
79
80 if on_disk:
81 return stream.write(s)
82 else:
83 length = len(s)
84
85 if length + total_length <= threshold:
86 stream.write(s)
87 else:
88 new_stream = t.cast(t.IO[bytes], TemporaryFile("wb+"))
89 new_stream.write(stream.getvalue()) # type: ignore
90 new_stream.write(s)
91 stream = new_stream
92 on_disk = True
93
94 total_length += length
95 return length
96
97 else:
98 write_binary = stream.write
99
100 encoder = MultipartEncoder(boundary.encode())
101 write_binary(encoder.send_event(Preamble(data=b"")))
102 for key, value in _iter_data(data):
103 reader = getattr(value, "read", None)
104 if reader is not None:
105 filename = getattr(value, "filename", getattr(value, "name", None))
106 content_type = getattr(value, "content_type", None)
107 if content_type is None:
108 content_type = (
109 filename
110 and mimetypes.guess_type(filename)[0]
111 or "application/octet-stream"
112 )
113 headers = value.headers
114 headers.update([("Content-Type", content_type)])
115 if filename is None:
116 write_binary(encoder.send_event(Field(name=key, headers=headers)))
117 else:
118 write_binary(
119 encoder.send_event(
120 File(name=key, filename=filename, headers=headers)
121 )
122 )
123 while True:
124 chunk = reader(16384)
125
126 if not chunk:
127 write_binary(encoder.send_event(Data(data=chunk, more_data=False)))
128 break
129
130 write_binary(encoder.send_event(Data(data=chunk, more_data=True)))
131 else:
132 if not isinstance(value, str):
133 value = str(value)
134 write_binary(encoder.send_event(Field(name=key, headers=Headers())))
135 write_binary(encoder.send_event(Data(data=value.encode(), more_data=False)))
136
137 write_binary(encoder.send_event(Epilogue(data=b"")))
138
139 length = stream.tell()
140 stream.seek(0)
141 return stream, length, boundary
142
143
144 def encode_multipart(
145 values: t.Mapping[str, t.Any], boundary: str | None = None
146 ) -> tuple[str, bytes]:
147 """Like `stream_encode_multipart` but returns a tuple in the form
148 (``boundary``, ``data``) where data is bytes.
149
150 .. versionchanged:: 3.0
151 The ``charset`` parameter was removed.
152 """
153 stream, length, boundary = stream_encode_multipart(
154 values, use_tempfile=False, boundary=boundary
155 )
156 return boundary, stream.read()
157
158
159 def _iter_data(data: t.Mapping[str, t.Any]) -> t.Iterator[tuple[str, t.Any]]:
160 """Iterate over a mapping that might have a list of values, yielding
161 all key, value pairs. Almost like iter_multi_items but only allows
162 lists, not tuples, of values so tuples can be used for files.
163 """
164 if isinstance(data, MultiDict):
165 yield from data.items(multi=True)
166 else:
167 for key, value in data.items():
168 if isinstance(value, list):
169 for v in value:
170 yield key, v
171 else:
172 yield key, value
173
174
175 _TAnyMultiDict = t.TypeVar("_TAnyMultiDict", bound=MultiDict)
176
177
178 class EnvironBuilder:
179 """This class can be used to conveniently create a WSGI environment
180 for testing purposes. It can be used to quickly create WSGI environments
181 or request objects from arbitrary data.
182
183 The signature of this class is also used in some other places as of
184 Werkzeug 0.5 (:func:`create_environ`, :meth:`Response.from_values`,
185 :meth:`Client.open`). Because of this most of the functionality is
186 available through the constructor alone.
187
188 Files and regular form data can be manipulated independently of each
189 other with the :attr:`form` and :attr:`files` attributes, but are
190 passed with the same argument to the constructor: `data`.
191
192 `data` can be any of these values:
193
194 - a `str` or `bytes` object: The object is converted into an
195 :attr:`input_stream`, the :attr:`content_length` is set and you have to
196 provide a :attr:`content_type`.
197 - a `dict` or :class:`MultiDict`: The keys have to be strings. The values
198 have to be either any of the following objects, or a list of any of the
199 following objects:
200
201 - a :class:`file`-like object: These are converted into
202 :class:`FileStorage` objects automatically.
203 - a `tuple`: The :meth:`~FileMultiDict.add_file` method is called
204 with the key and the unpacked `tuple` items as positional
205 arguments.
206 - a `str`: The string is set as form data for the associated key.
207 - a file-like object: The object content is loaded in memory and then
208 handled like a regular `str` or a `bytes`.
209
210 :param path: the path of the request. In the WSGI environment this will
211 end up as `PATH_INFO`. If the `query_string` is not defined
212 and there is a question mark in the `path` everything after
213 it is used as query string.
214 :param base_url: the base URL is a URL that is used to extract the WSGI
215 URL scheme, host (server name + server port) and the
216 script root (`SCRIPT_NAME`).
217 :param query_string: an optional string or dict with URL parameters.
218 :param method: the HTTP method to use, defaults to `GET`.
219 :param input_stream: an optional input stream. Do not specify this and
220 `data`. As soon as an input stream is set you can't
221 modify :attr:`args` and :attr:`files` unless you
222 set the :attr:`input_stream` to `None` again.
223 :param content_type: The content type for the request. As of 0.5 you
224 don't have to provide this when specifying files
225 and form data via `data`.
226 :param content_length: The content length for the request. You don't
227 have to specify this when providing data via
228 `data`.
229 :param errors_stream: an optional error stream that is used for
230 `wsgi.errors`. Defaults to :data:`stderr`.
231 :param multithread: controls `wsgi.multithread`. Defaults to `False`.
232 :param multiprocess: controls `wsgi.multiprocess`. Defaults to `False`.
233 :param run_once: controls `wsgi.run_once`. Defaults to `False`.
234 :param headers: an optional list or :class:`Headers` object of headers.
235 :param data: a string or dict of form data or a file-object.
236 See explanation above.
237 :param json: An object to be serialized and assigned to ``data``.
238 Defaults the content type to ``"application/json"``.
239 Serialized with the function assigned to :attr:`json_dumps`.
240 :param environ_base: an optional dict of environment defaults.
241 :param environ_overrides: an optional dict of environment overrides.
242 :param auth: An authorization object to use for the
243 ``Authorization`` header value. A ``(username, password)`` tuple
244 is a shortcut for ``Basic`` authorization.
245
246 .. versionchanged:: 3.0
247 The ``charset`` parameter was removed.
248
249 .. versionchanged:: 2.1
250 ``CONTENT_TYPE`` and ``CONTENT_LENGTH`` are not duplicated as
251 header keys in the environ.
252
253 .. versionchanged:: 2.0
254 ``REQUEST_URI`` and ``RAW_URI`` is the full raw URI including
255 the query string, not only the path.
256
257 .. versionchanged:: 2.0
258 The default :attr:`request_class` is ``Request`` instead of
259 ``BaseRequest``.
260
261 .. versionadded:: 2.0
262 Added the ``auth`` parameter.
263
264 .. versionadded:: 0.15
265 The ``json`` param and :meth:`json_dumps` method.
266
267 .. versionadded:: 0.15
268 The environ has keys ``REQUEST_URI`` and ``RAW_URI`` containing
269 the path before percent-decoding. This is not part of the WSGI
270 PEP, but many WSGI servers include it.
271
272 .. versionchanged:: 0.6
273 ``path`` and ``base_url`` can now be unicode strings that are
274 encoded with :func:`iri_to_uri`.
275 """
276
277 #: the server protocol to use. defaults to HTTP/1.1
278 server_protocol = "HTTP/1.1"
279
280 #: the wsgi version to use. defaults to (1, 0)
281 wsgi_version = (1, 0)
282
283 #: The default request class used by :meth:`get_request`.
284 request_class = Request
285
286 import json
287
288 #: The serialization function used when ``json`` is passed.
289 json_dumps = staticmethod(json.dumps)
290 del json
291
292 _args: MultiDict | None
293 _query_string: str | None
294 _input_stream: t.IO[bytes] | None
295 _form: MultiDict | None
296 _files: FileMultiDict | None
297
298 def __init__(
299 self,
300 path: str = "/",
301 base_url: str | None = None,
302 query_string: t.Mapping[str, str] | str | None = None,
303 method: str = "GET",
304 input_stream: t.IO[bytes] | None = None,
305 content_type: str | None = None,
306 content_length: int | None = None,
307 errors_stream: t.IO[str] | None = None,
308 multithread: bool = False,
309 multiprocess: bool = False,
310 run_once: bool = False,
311 headers: Headers | t.Iterable[tuple[str, str]] | None = None,
312 data: None | (t.IO[bytes] | str | bytes | t.Mapping[str, t.Any]) = None,
313 environ_base: t.Mapping[str, t.Any] | None = None,
314 environ_overrides: t.Mapping[str, t.Any] | None = None,
315 mimetype: str | None = None,
316 json: t.Mapping[str, t.Any] | None = None,
317 auth: Authorization | tuple[str, str] | None = None,
318 ) -> None:
319 if query_string is not None and "?" in path:
320 raise ValueError("Query string is defined in the path and as an argument")
321 request_uri = urlsplit(path)
322 if query_string is None and "?" in path:
323 query_string = request_uri.query
324
325 self.path = iri_to_uri(request_uri.path)
326 self.request_uri = path
327 if base_url is not None:
328 base_url = iri_to_uri(base_url)
329 self.base_url = base_url # type: ignore
330 if isinstance(query_string, str):
331 self.query_string = query_string
332 else:
333 if query_string is None:
334 query_string = MultiDict()
335 elif not isinstance(query_string, MultiDict):
336 query_string = MultiDict(query_string)
337 self.args = query_string
338 self.method = method
339 if headers is None:
340 headers = Headers()
341 elif not isinstance(headers, Headers):
342 headers = Headers(headers)
343 self.headers = headers
344 if content_type is not None:
345 self.content_type = content_type
346 if errors_stream is None:
347 errors_stream = sys.stderr
348 self.errors_stream = errors_stream
349 self.multithread = multithread
350 self.multiprocess = multiprocess
351 self.run_once = run_once
352 self.environ_base = environ_base
353 self.environ_overrides = environ_overrides
354 self.input_stream = input_stream
355 self.content_length = content_length
356 self.closed = False
357
358 if auth is not None:
359 if isinstance(auth, tuple):
360 auth = Authorization(
361 "basic", {"username": auth[0], "password": auth[1]}
362 )
363
364 self.headers.set("Authorization", auth.to_header())
365
366 if json is not None:
367 if data is not None:
368 raise TypeError("can't provide both json and data")
369
370 data = self.json_dumps(json)
371
372 if self.content_type is None:
373 self.content_type = "application/json"
374
375 if data:
376 if input_stream is not None:
377 raise TypeError("can't provide input stream and data")
378 if hasattr(data, "read"):
379 data = data.read()
380 if isinstance(data, str):
381 data = data.encode()
382 if isinstance(data, bytes):
383 self.input_stream = BytesIO(data)
384 if self.content_length is None:
385 self.content_length = len(data)
386 else:
387 for key, value in _iter_data(data):
388 if isinstance(value, (tuple, dict)) or hasattr(value, "read"):
389 self._add_file_from_data(key, value)
390 else:
391 self.form.setlistdefault(key).append(value)
392
393 if mimetype is not None:
394 self.mimetype = mimetype
395
396 @classmethod
397 def from_environ(cls, environ: WSGIEnvironment, **kwargs: t.Any) -> EnvironBuilder:
398 """Turn an environ dict back into a builder. Any extra kwargs
399 override the args extracted from the environ.
400
401 .. versionchanged:: 2.0
402 Path and query values are passed through the WSGI decoding
403 dance to avoid double encoding.
404
405 .. versionadded:: 0.15
406 """
407 headers = Headers(EnvironHeaders(environ))
408 out = {
409 "path": _wsgi_decoding_dance(environ["PATH_INFO"]),
410 "base_url": cls._make_base_url(
411 environ["wsgi.url_scheme"],
412 headers.pop("Host"),
413 _wsgi_decoding_dance(environ["SCRIPT_NAME"]),
414 ),
415 "query_string": _wsgi_decoding_dance(environ["QUERY_STRING"]),
416 "method": environ["REQUEST_METHOD"],
417 "input_stream": environ["wsgi.input"],
418 "content_type": headers.pop("Content-Type", None),
419 "content_length": headers.pop("Content-Length", None),
420 "errors_stream": environ["wsgi.errors"],
421 "multithread": environ["wsgi.multithread"],
422 "multiprocess": environ["wsgi.multiprocess"],
423 "run_once": environ["wsgi.run_once"],
424 "headers": headers,
425 }
426 out.update(kwargs)
427 return cls(**out)
428
429 def _add_file_from_data(
430 self,
431 key: str,
432 value: (t.IO[bytes] | tuple[t.IO[bytes], str] | tuple[t.IO[bytes], str, str]),
433 ) -> None:
434 """Called in the EnvironBuilder to add files from the data dict."""
435 if isinstance(value, tuple):
436 self.files.add_file(key, *value)
437 else:
438 self.files.add_file(key, value)
439
440 @staticmethod
441 def _make_base_url(scheme: str, host: str, script_root: str) -> str:
442 return urlunsplit((scheme, host, script_root, "", "")).rstrip("/") + "/"
443
444 @property
445 def base_url(self) -> str:
446 """The base URL is used to extract the URL scheme, host name,
447 port, and root path.
448 """
449 return self._make_base_url(self.url_scheme, self.host, self.script_root)
450
451 @base_url.setter
452 def base_url(self, value: str | None) -> None:
453 if value is None:
454 scheme = "http"
455 netloc = "localhost"
456 script_root = ""
457 else:
458 scheme, netloc, script_root, qs, anchor = urlsplit(value)
459 if qs or anchor:
460 raise ValueError("base url must not contain a query string or fragment")
461 self.script_root = script_root.rstrip("/")
462 self.host = netloc
463 self.url_scheme = scheme
464
465 @property
466 def content_type(self) -> str | None:
467 """The content type for the request. Reflected from and to
468 the :attr:`headers`. Do not set if you set :attr:`files` or
469 :attr:`form` for auto detection.
470 """
471 ct = self.headers.get("Content-Type")
472 if ct is None and not self._input_stream:
473 if self._files:
474 return "multipart/form-data"
475 if self._form:
476 return "application/x-www-form-urlencoded"
477 return None
478 return ct
479
480 @content_type.setter
481 def content_type(self, value: str | None) -> None:
482 if value is None:
483 self.headers.pop("Content-Type", None)
484 else:
485 self.headers["Content-Type"] = value
486
487 @property
488 def mimetype(self) -> str | None:
489 """The mimetype (content type without charset etc.)
490
491 .. versionadded:: 0.14
492 """
493 ct = self.content_type
494 return ct.split(";")[0].strip() if ct else None
495
496 @mimetype.setter
497 def mimetype(self, value: str) -> None:
498 self.content_type = get_content_type(value, "utf-8")
499
500 @property
501 def mimetype_params(self) -> t.Mapping[str, str]:
502 """The mimetype parameters as dict. For example if the
503 content type is ``text/html; charset=utf-8`` the params would be
504 ``{'charset': 'utf-8'}``.
505
506 .. versionadded:: 0.14
507 """
508
509 def on_update(d: CallbackDict) -> None:
510 self.headers["Content-Type"] = dump_options_header(self.mimetype, d)
511
512 d = parse_options_header(self.headers.get("content-type", ""))[1]
513 return CallbackDict(d, on_update)
514
515 @property
516 def content_length(self) -> int | None:
517 """The content length as integer. Reflected from and to the
518 :attr:`headers`. Do not set if you set :attr:`files` or
519 :attr:`form` for auto detection.
520 """
521 return self.headers.get("Content-Length", type=int)
522
523 @content_length.setter
524 def content_length(self, value: int | None) -> None:
525 if value is None:
526 self.headers.pop("Content-Length", None)
527 else:
528 self.headers["Content-Length"] = str(value)
529
530 def _get_form(self, name: str, storage: type[_TAnyMultiDict]) -> _TAnyMultiDict:
531 """Common behavior for getting the :attr:`form` and
532 :attr:`files` properties.
533
534 :param name: Name of the internal cached attribute.
535 :param storage: Storage class used for the data.
536 """
537 if self.input_stream is not None:
538 raise AttributeError("an input stream is defined")
539
540 rv = getattr(self, name)
541
542 if rv is None:
543 rv = storage()
544 setattr(self, name, rv)
545
546 return rv # type: ignore
547
548 def _set_form(self, name: str, value: MultiDict) -> None:
549 """Common behavior for setting the :attr:`form` and
550 :attr:`files` properties.
551
552 :param name: Name of the internal cached attribute.
553 :param value: Value to assign to the attribute.
554 """
555 self._input_stream = None
556 setattr(self, name, value)
557
558 @property
559 def form(self) -> MultiDict:
560 """A :class:`MultiDict` of form values."""
561 return self._get_form("_form", MultiDict)
562
563 @form.setter
564 def form(self, value: MultiDict) -> None:
565 self._set_form("_form", value)
566
567 @property
568 def files(self) -> FileMultiDict:
569 """A :class:`FileMultiDict` of uploaded files. Use
570 :meth:`~FileMultiDict.add_file` to add new files.
571 """
572 return self._get_form("_files", FileMultiDict)
573
574 @files.setter
575 def files(self, value: FileMultiDict) -> None:
576 self._set_form("_files", value)
577
578 @property
579 def input_stream(self) -> t.IO[bytes] | None:
580 """An optional input stream. This is mutually exclusive with
581 setting :attr:`form` and :attr:`files`, setting it will clear
582 those. Do not provide this if the method is not ``POST`` or
583 another method that has a body.
584 """
585 return self._input_stream
586
587 @input_stream.setter
588 def input_stream(self, value: t.IO[bytes] | None) -> None:
589 self._input_stream = value
590 self._form = None
591 self._files = None
592
593 @property
594 def query_string(self) -> str:
595 """The query string. If you set this to a string
596 :attr:`args` will no longer be available.
597 """
598 if self._query_string is None:
599 if self._args is not None:
600 return _urlencode(self._args)
601 return ""
602 return self._query_string
603
604 @query_string.setter
605 def query_string(self, value: str | None) -> None:
606 self._query_string = value
607 self._args = None
608
609 @property
610 def args(self) -> MultiDict:
611 """The URL arguments as :class:`MultiDict`."""
612 if self._query_string is not None:
613 raise AttributeError("a query string is defined")
614 if self._args is None:
615 self._args = MultiDict()
616 return self._args
617
618 @args.setter
619 def args(self, value: MultiDict | None) -> None:
620 self._query_string = None
621 self._args = value
622
623 @property
624 def server_name(self) -> str:
625 """The server name (read-only, use :attr:`host` to set)"""
626 return self.host.split(":", 1)[0]
627
628 @property
629 def server_port(self) -> int:
630 """The server port as integer (read-only, use :attr:`host` to set)"""
631 pieces = self.host.split(":", 1)
632
633 if len(pieces) == 2:
634 try:
635 return int(pieces[1])
636 except ValueError:
637 pass
638
639 if self.url_scheme == "https":
640 return 443
641 return 80
642
643 def __del__(self) -> None:
644 try:
645 self.close()
646 except Exception:
647 pass
648
649 def close(self) -> None:
650 """Closes all files. If you put real :class:`file` objects into the
651 :attr:`files` dict you can call this method to automatically close
652 them all in one go.
653 """
654 if self.closed:
655 return
656 try:
657 files = self.files.values()
658 except AttributeError:
659 files = () # type: ignore
660 for f in files:
661 try:
662 f.close()
663 except Exception:
664 pass
665 self.closed = True
666
667 def get_environ(self) -> WSGIEnvironment:
668 """Return the built environ.
669
670 .. versionchanged:: 0.15
671 The content type and length headers are set based on
672 input stream detection. Previously this only set the WSGI
673 keys.
674 """
675 input_stream = self.input_stream
676 content_length = self.content_length
677
678 mimetype = self.mimetype
679 content_type = self.content_type
680
681 if input_stream is not None:
682 start_pos = input_stream.tell()
683 input_stream.seek(0, 2)
684 end_pos = input_stream.tell()
685 input_stream.seek(start_pos)
686 content_length = end_pos - start_pos
687 elif mimetype == "multipart/form-data":
688 input_stream, content_length, boundary = stream_encode_multipart(
689 CombinedMultiDict([self.form, self.files])
690 )
691 content_type = f'{mimetype}; boundary="{boundary}"'
692 elif mimetype == "application/x-www-form-urlencoded":
693 form_encoded = _urlencode(self.form).encode("ascii")
694 content_length = len(form_encoded)
695 input_stream = BytesIO(form_encoded)
696 else:
697 input_stream = BytesIO()
698
699 result: WSGIEnvironment = {}
700 if self.environ_base:
701 result.update(self.environ_base)
702
703 def _path_encode(x: str) -> str:
704 return _wsgi_encoding_dance(unquote(x))
705
706 raw_uri = _wsgi_encoding_dance(self.request_uri)
707 result.update(
708 {
709 "REQUEST_METHOD": self.method,
710 "SCRIPT_NAME": _path_encode(self.script_root),
711 "PATH_INFO": _path_encode(self.path),
712 "QUERY_STRING": _wsgi_encoding_dance(self.query_string),
713 # Non-standard, added by mod_wsgi, uWSGI
714 "REQUEST_URI": raw_uri,
715 # Non-standard, added by gunicorn
716 "RAW_URI": raw_uri,
717 "SERVER_NAME": self.server_name,
718 "SERVER_PORT": str(self.server_port),
719 "HTTP_HOST": self.host,
720 "SERVER_PROTOCOL": self.server_protocol,
721 "wsgi.version": self.wsgi_version,
722 "wsgi.url_scheme": self.url_scheme,
723 "wsgi.input": input_stream,
724 "wsgi.errors": self.errors_stream,
725 "wsgi.multithread": self.multithread,
726 "wsgi.multiprocess": self.multiprocess,
727 "wsgi.run_once": self.run_once,
728 }
729 )
730
731 headers = self.headers.copy()
732 # Don't send these as headers, they're part of the environ.
733 headers.remove("Content-Type")
734 headers.remove("Content-Length")
735
736 if content_type is not None:
737 result["CONTENT_TYPE"] = content_type
738
739 if content_length is not None:
740 result["CONTENT_LENGTH"] = str(content_length)
741
742 combined_headers = defaultdict(list)
743
744 for key, value in headers.to_wsgi_list():
745 combined_headers[f"HTTP_{key.upper().replace('-', '_')}"].append(value)
746
747 for key, values in combined_headers.items():
748 result[key] = ", ".join(values)
749
750 if self.environ_overrides:
751 result.update(self.environ_overrides)
752
753 return result
754
755 def get_request(self, cls: type[Request] | None = None) -> Request:
756 """Returns a request with the data. If the request class is not
757 specified :attr:`request_class` is used.
758
759 :param cls: The request wrapper to use.
760 """
761 if cls is None:
762 cls = self.request_class
763
764 return cls(self.get_environ())
765
766
767 class ClientRedirectError(Exception):
768 """If a redirect loop is detected when using follow_redirects=True with
769 the :cls:`Client`, then this exception is raised.
770 """
771
772
773 class Client:
774 """Simulate sending requests to a WSGI application without running a WSGI or HTTP
775 server.
776
777 :param application: The WSGI application to make requests to.
778 :param response_wrapper: A :class:`.Response` class to wrap response data with.
779 Defaults to :class:`.TestResponse`. If it's not a subclass of ``TestResponse``,
780 one will be created.
781 :param use_cookies: Persist cookies from ``Set-Cookie`` response headers to the
782 ``Cookie`` header in subsequent requests. Domain and path matching is supported,
783 but other cookie parameters are ignored.
784 :param allow_subdomain_redirects: Allow requests to follow redirects to subdomains.
785 Enable this if the application handles subdomains and redirects between them.
786
787 .. versionchanged:: 2.3
788 Simplify cookie implementation, support domain and path matching.
789
790 .. versionchanged:: 2.1
791 All data is available as properties on the returned response object. The
792 response cannot be returned as a tuple.
793
794 .. versionchanged:: 2.0
795 ``response_wrapper`` is always a subclass of :class:``TestResponse``.
796
797 .. versionchanged:: 0.5
798 Added the ``use_cookies`` parameter.
799 """
800
801 def __init__(
802 self,
803 application: WSGIApplication,
804 response_wrapper: type[Response] | None = None,
805 use_cookies: bool = True,
806 allow_subdomain_redirects: bool = False,
807 ) -> None:
808 self.application = application
809
810 if response_wrapper in {None, Response}:
811 response_wrapper = TestResponse
812 elif not isinstance(response_wrapper, TestResponse):
813 response_wrapper = type(
814 "WrapperTestResponse",
815 (TestResponse, response_wrapper), # type: ignore
816 {},
817 )
818
819 self.response_wrapper = t.cast(t.Type["TestResponse"], response_wrapper)
820
821 if use_cookies:
822 self._cookies: dict[tuple[str, str, str], Cookie] | None = {}
823 else:
824 self._cookies = None
825
826 self.allow_subdomain_redirects = allow_subdomain_redirects
827
828 def get_cookie(
829 self, key: str, domain: str = "localhost", path: str = "/"
830 ) -> Cookie | None:
831 """Return a :class:`.Cookie` if it exists. Cookies are uniquely identified by
832 ``(domain, path, key)``.
833
834 :param key: The decoded form of the key for the cookie.
835 :param domain: The domain the cookie was set for.
836 :param path: The path the cookie was set for.
837
838 .. versionadded:: 2.3
839 """
840 if self._cookies is None:
841 raise TypeError(
842 "Cookies are disabled. Create a client with 'use_cookies=True'."
843 )
844
845 return self._cookies.get((domain, path, key))
846
847 def set_cookie(
848 self,
849 key: str,
850 value: str = "",
851 *,
852 domain: str = "localhost",
853 origin_only: bool = True,
854 path: str = "/",
855 **kwargs: t.Any,
856 ) -> None:
857 """Set a cookie to be sent in subsequent requests.
858
859 This is a convenience to skip making a test request to a route that would set
860 the cookie. To test the cookie, make a test request to a route that uses the
861 cookie value.
862
863 The client uses ``domain``, ``origin_only``, and ``path`` to determine which
864 cookies to send with a request. It does not use other cookie parameters that
865 browsers use, since they're not applicable in tests.
866
867 :param key: The key part of the cookie.
868 :param value: The value part of the cookie.
869 :param domain: Send this cookie with requests that match this domain. If
870 ``origin_only`` is true, it must be an exact match, otherwise it may be a
871 suffix match.
872 :param origin_only: Whether the domain must be an exact match to the request.
873 :param path: Send this cookie with requests that match this path either exactly
874 or as a prefix.
875 :param kwargs: Passed to :func:`.dump_cookie`.
876
877 .. versionchanged:: 3.0
878 The parameter ``server_name`` is removed. The first parameter is
879 ``key``. Use the ``domain`` and ``origin_only`` parameters instead.
880
881 .. versionchanged:: 2.3
882 The ``origin_only`` parameter was added.
883
884 .. versionchanged:: 2.3
885 The ``domain`` parameter defaults to ``localhost``.
886 """
887 if self._cookies is None:
888 raise TypeError(
889 "Cookies are disabled. Create a client with 'use_cookies=True'."
890 )
891
892 cookie = Cookie._from_response_header(
893 domain, "/", dump_cookie(key, value, domain=domain, path=path, **kwargs)
894 )
895 cookie.origin_only = origin_only
896
897 if cookie._should_delete:
898 self._cookies.pop(cookie._storage_key, None)
899 else:
900 self._cookies[cookie._storage_key] = cookie
901
902 def delete_cookie(
903 self,
904 key: str,
905 *,
906 domain: str = "localhost",
907 path: str = "/",
908 ) -> None:
909 """Delete a cookie if it exists. Cookies are uniquely identified by
910 ``(domain, path, key)``.
911
912 :param key: The decoded form of the key for the cookie.
913 :param domain: The domain the cookie was set for.
914 :param path: The path the cookie was set for.
915
916 .. versionchanged:: 3.0
917 The ``server_name`` parameter is removed. The first parameter is
918 ``key``. Use the ``domain`` parameter instead.
919
920 .. versionchanged:: 3.0
921 The ``secure``, ``httponly`` and ``samesite`` parameters are removed.
922
923 .. versionchanged:: 2.3
924 The ``domain`` parameter defaults to ``localhost``.
925 """
926 if self._cookies is None:
927 raise TypeError(
928 "Cookies are disabled. Create a client with 'use_cookies=True'."
929 )
930
931 self._cookies.pop((domain, path, key), None)
932
933 def _add_cookies_to_wsgi(self, environ: WSGIEnvironment) -> None:
934 """If cookies are enabled, set the ``Cookie`` header in the environ to the
935 cookies that are applicable to the request host and path.
936
937 :meta private:
938
939 .. versionadded:: 2.3
940 """
941 if self._cookies is None:
942 return
943
944 url = urlsplit(get_current_url(environ))
945 server_name = url.hostname or "localhost"
946 value = "; ".join(
947 c._to_request_header()
948 for c in self._cookies.values()
949 if c._matches_request(server_name, url.path)
950 )
951
952 if value:
953 environ["HTTP_COOKIE"] = value
954 else:
955 environ.pop("HTTP_COOKIE", None)
956
957 def _update_cookies_from_response(
958 self, server_name: str, path: str, headers: list[str]
959 ) -> None:
960 """If cookies are enabled, update the stored cookies from any ``Set-Cookie``
961 headers in the response.
962
963 :meta private:
964
965 .. versionadded:: 2.3
966 """
967 if self._cookies is None:
968 return
969
970 for header in headers:
971 cookie = Cookie._from_response_header(server_name, path, header)
972
973 if cookie._should_delete:
974 self._cookies.pop(cookie._storage_key, None)
975 else:
976 self._cookies[cookie._storage_key] = cookie
977
978 def run_wsgi_app(
979 self, environ: WSGIEnvironment, buffered: bool = False
980 ) -> tuple[t.Iterable[bytes], str, Headers]:
981 """Runs the wrapped WSGI app with the given environment.
982
983 :meta private:
984 """
985 self._add_cookies_to_wsgi(environ)
986 rv = run_wsgi_app(self.application, environ, buffered=buffered)
987 url = urlsplit(get_current_url(environ))
988 self._update_cookies_from_response(
989 url.hostname or "localhost", url.path, rv[2].getlist("Set-Cookie")
990 )
991 return rv
992
993 def resolve_redirect(
994 self, response: TestResponse, buffered: bool = False
995 ) -> TestResponse:
996 """Perform a new request to the location given by the redirect
997 response to the previous request.
998
999 :meta private:
1000 """
1001 scheme, netloc, path, qs, anchor = urlsplit(response.location)
1002 builder = EnvironBuilder.from_environ(
1003 response.request.environ, path=path, query_string=qs
1004 )
1005
1006 to_name_parts = netloc.split(":", 1)[0].split(".")
1007 from_name_parts = builder.server_name.split(".")
1008
1009 if to_name_parts != [""]:
1010 # The new location has a host, use it for the base URL.
1011 builder.url_scheme = scheme
1012 builder.host = netloc
1013 else:
1014 # A local redirect with autocorrect_location_header=False
1015 # doesn't have a host, so use the request's host.
1016 to_name_parts = from_name_parts
1017
1018 # Explain why a redirect to a different server name won't be followed.
1019 if to_name_parts != from_name_parts:
1020 if to_name_parts[-len(from_name_parts) :] == from_name_parts:
1021 if not self.allow_subdomain_redirects:
1022 raise RuntimeError("Following subdomain redirects is not enabled.")
1023 else:
1024 raise RuntimeError("Following external redirects is not supported.")
1025
1026 path_parts = path.split("/")
1027 root_parts = builder.script_root.split("/")
1028
1029 if path_parts[: len(root_parts)] == root_parts:
1030 # Strip the script root from the path.
1031 builder.path = path[len(builder.script_root) :]
1032 else:
1033 # The new location is not under the script root, so use the
1034 # whole path and clear the previous root.
1035 builder.path = path
1036 builder.script_root = ""
1037
1038 # Only 307 and 308 preserve all of the original request.
1039 if response.status_code not in {307, 308}:
1040 # HEAD is preserved, everything else becomes GET.
1041 if builder.method != "HEAD":
1042 builder.method = "GET"
1043
1044 # Clear the body and the headers that describe it.
1045
1046 if builder.input_stream is not None:
1047 builder.input_stream.close()
1048 builder.input_stream = None
1049
1050 builder.content_type = None
1051 builder.content_length = None
1052 builder.headers.pop("Transfer-Encoding", None)
1053
1054 return self.open(builder, buffered=buffered)
1055
1056 def open(
1057 self,
1058 *args: t.Any,
1059 buffered: bool = False,
1060 follow_redirects: bool = False,
1061 **kwargs: t.Any,
1062 ) -> TestResponse:
1063 """Generate an environ dict from the given arguments, make a
1064 request to the application using it, and return the response.
1065
1066 :param args: Passed to :class:`EnvironBuilder` to create the
1067 environ for the request. If a single arg is passed, it can
1068 be an existing :class:`EnvironBuilder` or an environ dict.
1069 :param buffered: Convert the iterator returned by the app into
1070 a list. If the iterator has a ``close()`` method, it is
1071 called automatically.
1072 :param follow_redirects: Make additional requests to follow HTTP
1073 redirects until a non-redirect status is returned.
1074 :attr:`TestResponse.history` lists the intermediate
1075 responses.
1076
1077 .. versionchanged:: 2.1
1078 Removed the ``as_tuple`` parameter.
1079
1080 .. versionchanged:: 2.0
1081 The request input stream is closed when calling
1082 ``response.close()``. Input streams for redirects are
1083 automatically closed.
1084
1085 .. versionchanged:: 0.5
1086 If a dict is provided as file in the dict for the ``data``
1087 parameter the content type has to be called ``content_type``
1088 instead of ``mimetype``. This change was made for
1089 consistency with :class:`werkzeug.FileWrapper`.
1090
1091 .. versionchanged:: 0.5
1092 Added the ``follow_redirects`` parameter.
1093 """
1094 request: Request | None = None
1095
1096 if not kwargs and len(args) == 1:
1097 arg = args[0]
1098
1099 if isinstance(arg, EnvironBuilder):
1100 request = arg.get_request()
1101 elif isinstance(arg, dict):
1102 request = EnvironBuilder.from_environ(arg).get_request()
1103 elif isinstance(arg, Request):
1104 request = arg
1105
1106 if request is None:
1107 builder = EnvironBuilder(*args, **kwargs)
1108
1109 try:
1110 request = builder.get_request()
1111 finally:
1112 builder.close()
1113
1114 response = self.run_wsgi_app(request.environ, buffered=buffered)
1115 response = self.response_wrapper(*response, request=request)
1116
1117 redirects = set()
1118 history: list[TestResponse] = []
1119
1120 if not follow_redirects:
1121 return response
1122
1123 while response.status_code in {
1124 301,
1125 302,
1126 303,
1127 305,
1128 307,
1129 308,
1130 }:
1131 # Exhaust intermediate response bodies to ensure middleware
1132 # that returns an iterator runs any cleanup code.
1133 if not buffered:
1134 response.make_sequence()
1135 response.close()
1136
1137 new_redirect_entry = (response.location, response.status_code)
1138
1139 if new_redirect_entry in redirects:
1140 raise ClientRedirectError(
1141 f"Loop detected: A {response.status_code} redirect"
1142 f" to {response.location} was already made."
1143 )
1144
1145 redirects.add(new_redirect_entry)
1146 response.history = tuple(history)
1147 history.append(response)
1148 response = self.resolve_redirect(response, buffered=buffered)
1149 else:
1150 # This is the final request after redirects.
1151 response.history = tuple(history)
1152 # Close the input stream when closing the response, in case
1153 # the input is an open temporary file.
1154 response.call_on_close(request.input_stream.close)
1155 return response
1156
1157 def get(self, *args: t.Any, **kw: t.Any) -> TestResponse:
1158 """Call :meth:`open` with ``method`` set to ``GET``."""
1159 kw["method"] = "GET"
1160 return self.open(*args, **kw)
1161
1162 def post(self, *args: t.Any, **kw: t.Any) -> TestResponse:
1163 """Call :meth:`open` with ``method`` set to ``POST``."""
1164 kw["method"] = "POST"
1165 return self.open(*args, **kw)
1166
1167 def put(self, *args: t.Any, **kw: t.Any) -> TestResponse:
1168 """Call :meth:`open` with ``method`` set to ``PUT``."""
1169 kw["method"] = "PUT"
1170 return self.open(*args, **kw)
1171
1172 def delete(self, *args: t.Any, **kw: t.Any) -> TestResponse:
1173 """Call :meth:`open` with ``method`` set to ``DELETE``."""
1174 kw["method"] = "DELETE"
1175 return self.open(*args, **kw)
1176
1177 def patch(self, *args: t.Any, **kw: t.Any) -> TestResponse:
1178 """Call :meth:`open` with ``method`` set to ``PATCH``."""
1179 kw["method"] = "PATCH"
1180 return self.open(*args, **kw)
1181
1182 def options(self, *args: t.Any, **kw: t.Any) -> TestResponse:
1183 """Call :meth:`open` with ``method`` set to ``OPTIONS``."""
1184 kw["method"] = "OPTIONS"
1185 return self.open(*args, **kw)
1186
1187 def head(self, *args: t.Any, **kw: t.Any) -> TestResponse:
1188 """Call :meth:`open` with ``method`` set to ``HEAD``."""
1189 kw["method"] = "HEAD"
1190 return self.open(*args, **kw)
1191
1192 def trace(self, *args: t.Any, **kw: t.Any) -> TestResponse:
1193 """Call :meth:`open` with ``method`` set to ``TRACE``."""
1194 kw["method"] = "TRACE"
1195 return self.open(*args, **kw)
1196
1197 def __repr__(self) -> str:
1198 return f"<{type(self).__name__} {self.application!r}>"
1199
1200
1201 def create_environ(*args: t.Any, **kwargs: t.Any) -> WSGIEnvironment:
1202 """Create a new WSGI environ dict based on the values passed. The first
1203 parameter should be the path of the request which defaults to '/'. The
1204 second one can either be an absolute path (in that case the host is
1205 localhost:80) or a full path to the request with scheme, netloc port and
1206 the path to the script.
1207
1208 This accepts the same arguments as the :class:`EnvironBuilder`
1209 constructor.
1210
1211 .. versionchanged:: 0.5
1212 This function is now a thin wrapper over :class:`EnvironBuilder` which
1213 was added in 0.5. The `headers`, `environ_base`, `environ_overrides`
1214 and `charset` parameters were added.
1215 """
1216 builder = EnvironBuilder(*args, **kwargs)
1217
1218 try:
1219 return builder.get_environ()
1220 finally:
1221 builder.close()
1222
1223
1224 def run_wsgi_app(
1225 app: WSGIApplication, environ: WSGIEnvironment, buffered: bool = False
1226 ) -> tuple[t.Iterable[bytes], str, Headers]:
1227 """Return a tuple in the form (app_iter, status, headers) of the
1228 application output. This works best if you pass it an application that
1229 returns an iterator all the time.
1230
1231 Sometimes applications may use the `write()` callable returned
1232 by the `start_response` function. This tries to resolve such edge
1233 cases automatically. But if you don't get the expected output you
1234 should set `buffered` to `True` which enforces buffering.
1235
1236 If passed an invalid WSGI application the behavior of this function is
1237 undefined. Never pass non-conforming WSGI applications to this function.
1238
1239 :param app: the application to execute.
1240 :param buffered: set to `True` to enforce buffering.
1241 :return: tuple in the form ``(app_iter, status, headers)``
1242 """
1243 # Copy environ to ensure any mutations by the app (ProxyFix, for
1244 # example) don't affect subsequent requests (such as redirects).
1245 environ = _get_environ(environ).copy()
1246 status: str
1247 response: tuple[str, list[tuple[str, str]]] | None = None
1248 buffer: list[bytes] = []
1249
1250 def start_response(status, headers, exc_info=None): # type: ignore
1251 nonlocal response
1252
1253 if exc_info:
1254 try:
1255 raise exc_info[1].with_traceback(exc_info[2])
1256 finally:
1257 exc_info = None
1258
1259 response = (status, headers)
1260 return buffer.append
1261
1262 app_rv = app(environ, start_response)
1263 close_func = getattr(app_rv, "close", None)
1264 app_iter: t.Iterable[bytes] = iter(app_rv)
1265
1266 # when buffering we emit the close call early and convert the
1267 # application iterator into a regular list
1268 if buffered:
1269 try:
1270 app_iter = list(app_iter)
1271 finally:
1272 if close_func is not None:
1273 close_func()
1274
1275 # otherwise we iterate the application iter until we have a response, chain
1276 # the already received data with the already collected data and wrap it in
1277 # a new `ClosingIterator` if we need to restore a `close` callable from the
1278 # original return value.
1279 else:
1280 for item in app_iter:
1281 buffer.append(item)
1282
1283 if response is not None:
1284 break
1285
1286 if buffer:
1287 app_iter = chain(buffer, app_iter)
1288
1289 if close_func is not None and app_iter is not app_rv:
1290 app_iter = ClosingIterator(app_iter, close_func)
1291
1292 status, headers = response # type: ignore
1293 return app_iter, status, Headers(headers)
1294
1295
1296 class TestResponse(Response):
1297 """:class:`~werkzeug.wrappers.Response` subclass that provides extra
1298 information about requests made with the test :class:`Client`.
1299
1300 Test client requests will always return an instance of this class.
1301 If a custom response class is passed to the client, it is
1302 subclassed along with this to support test information.
1303
1304 If the test request included large files, or if the application is
1305 serving a file, call :meth:`close` to close any open files and
1306 prevent Python showing a ``ResourceWarning``.
1307
1308 .. versionchanged:: 2.2
1309 Set the ``default_mimetype`` to None to prevent a mimetype being
1310 assumed if missing.
1311
1312 .. versionchanged:: 2.1
1313 Response instances cannot be treated as tuples.
1314
1315 .. versionadded:: 2.0
1316 Test client methods always return instances of this class.
1317 """
1318
1319 default_mimetype = None
1320 # Don't assume a mimetype, instead use whatever the response provides
1321
1322 request: Request
1323 """A request object with the environ used to make the request that
1324 resulted in this response.
1325 """
1326
1327 history: tuple[TestResponse, ...]
1328 """A list of intermediate responses. Populated when the test request
1329 is made with ``follow_redirects`` enabled.
1330 """
1331
1332 # Tell Pytest to ignore this, it's not a test class.
1333 __test__ = False
1334
1335 def __init__(
1336 self,
1337 response: t.Iterable[bytes],
1338 status: str,
1339 headers: Headers,
1340 request: Request,
1341 history: tuple[TestResponse] = (), # type: ignore
1342 **kwargs: t.Any,
1343 ) -> None:
1344 super().__init__(response, status, headers, **kwargs)
1345 self.request = request
1346 self.history = history
1347 self._compat_tuple = response, status, headers
1348
1349 @cached_property
1350 def text(self) -> str:
1351 """The response data as text. A shortcut for
1352 ``response.get_data(as_text=True)``.
1353
1354 .. versionadded:: 2.1
1355 """
1356 return self.get_data(as_text=True)
1357
1358
1359 @dataclasses.dataclass
1360 class Cookie:
1361 """A cookie key, value, and parameters.
1362
1363 The class itself is not a public API. Its attributes are documented for inspection
1364 with :meth:`.Client.get_cookie` only.
1365
1366 .. versionadded:: 2.3
1367 """
1368
1369 key: str
1370 """The cookie key, encoded as a client would see it."""
1371
1372 value: str
1373 """The cookie key, encoded as a client would see it."""
1374
1375 decoded_key: str
1376 """The cookie key, decoded as the application would set and see it."""
1377
1378 decoded_value: str
1379 """The cookie value, decoded as the application would set and see it."""
1380
1381 expires: datetime | None
1382 """The time at which the cookie is no longer valid."""
1383
1384 max_age: int | None
1385 """The number of seconds from when the cookie was set at which it is
1386 no longer valid.
1387 """
1388
1389 domain: str
1390 """The domain that the cookie was set for, or the request domain if not set."""
1391
1392 origin_only: bool
1393 """Whether the cookie will be sent for exact domain matches only. This is ``True``
1394 if the ``Domain`` parameter was not present.
1395 """
1396
1397 path: str
1398 """The path that the cookie was set for."""
1399
1400 secure: bool | None
1401 """The ``Secure`` parameter."""
1402
1403 http_only: bool | None
1404 """The ``HttpOnly`` parameter."""
1405
1406 same_site: str | None
1407 """The ``SameSite`` parameter."""
1408
1409 def _matches_request(self, server_name: str, path: str) -> bool:
1410 return (
1411 server_name == self.domain
1412 or (
1413 not self.origin_only
1414 and server_name.endswith(self.domain)
1415 and server_name[: -len(self.domain)].endswith(".")
1416 )
1417 ) and (
1418 path == self.path
1419 or (
1420 path.startswith(self.path)
1421 and path[len(self.path) - self.path.endswith("/") :].startswith("/")
1422 )
1423 )
1424
1425 def _to_request_header(self) -> str:
1426 return f"{self.key}={self.value}"
1427
1428 @classmethod
1429 def _from_response_header(cls, server_name: str, path: str, header: str) -> te.Self:
1430 header, _, parameters_str = header.partition(";")
1431 key, _, value = header.partition("=")
1432 decoded_key, decoded_value = next(parse_cookie(header).items())
1433 params = {}
1434
1435 for item in parameters_str.split(";"):
1436 k, sep, v = item.partition("=")
1437 params[k.strip().lower()] = v.strip() if sep else None
1438
1439 return cls(
1440 key=key.strip(),
1441 value=value.strip(),
1442 decoded_key=decoded_key,
1443 decoded_value=decoded_value,
1444 expires=parse_date(params.get("expires")),
1445 max_age=int(params["max-age"] or 0) if "max-age" in params else None,
1446 domain=params.get("domain") or server_name,
1447 origin_only="domain" not in params,
1448 path=params.get("path") or path.rpartition("/")[0] or "/",
1449 secure="secure" in params,
1450 http_only="httponly" in params,
1451 same_site=params.get("samesite"),
1452 )
1453
1454 @property
1455 def _storage_key(self) -> tuple[str, str, str]:
1456 return self.domain, self.path, self.decoded_key
1457
1458 @property
1459 def _should_delete(self) -> bool:
1460 return self.max_age == 0 or (
1461 self.expires is not None and self.expires.timestamp() == 0
1462 )