]> jfr.im git - dlqueue.git/blob - venv/lib/python3.11/site-packages/werkzeug/http.py
init: venv aand flask
[dlqueue.git] / venv / lib / python3.11 / site-packages / werkzeug / http.py
1 from __future__ import annotations
2
3 import email.utils
4 import re
5 import typing as t
6 import warnings
7 from datetime import date
8 from datetime import datetime
9 from datetime import time
10 from datetime import timedelta
11 from datetime import timezone
12 from enum import Enum
13 from hashlib import sha1
14 from time import mktime
15 from time import struct_time
16 from urllib.parse import quote
17 from urllib.parse import unquote
18 from urllib.request import parse_http_list as _parse_list_header
19
20 from ._internal import _dt_as_utc
21 from ._internal import _plain_int
22
23 if t.TYPE_CHECKING:
24 from _typeshed.wsgi import WSGIEnvironment
25
26 _token_chars = frozenset(
27 "!#$%&'*+-.0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ^_`abcdefghijklmnopqrstuvwxyz|~"
28 )
29 _etag_re = re.compile(r'([Ww]/)?(?:"(.*?)"|(.*?))(?:\s*,\s*|$)')
30 _entity_headers = frozenset(
31 [
32 "allow",
33 "content-encoding",
34 "content-language",
35 "content-length",
36 "content-location",
37 "content-md5",
38 "content-range",
39 "content-type",
40 "expires",
41 "last-modified",
42 ]
43 )
44 _hop_by_hop_headers = frozenset(
45 [
46 "connection",
47 "keep-alive",
48 "proxy-authenticate",
49 "proxy-authorization",
50 "te",
51 "trailer",
52 "transfer-encoding",
53 "upgrade",
54 ]
55 )
56 HTTP_STATUS_CODES = {
57 100: "Continue",
58 101: "Switching Protocols",
59 102: "Processing",
60 103: "Early Hints", # see RFC 8297
61 200: "OK",
62 201: "Created",
63 202: "Accepted",
64 203: "Non Authoritative Information",
65 204: "No Content",
66 205: "Reset Content",
67 206: "Partial Content",
68 207: "Multi Status",
69 208: "Already Reported", # see RFC 5842
70 226: "IM Used", # see RFC 3229
71 300: "Multiple Choices",
72 301: "Moved Permanently",
73 302: "Found",
74 303: "See Other",
75 304: "Not Modified",
76 305: "Use Proxy",
77 306: "Switch Proxy", # unused
78 307: "Temporary Redirect",
79 308: "Permanent Redirect",
80 400: "Bad Request",
81 401: "Unauthorized",
82 402: "Payment Required", # unused
83 403: "Forbidden",
84 404: "Not Found",
85 405: "Method Not Allowed",
86 406: "Not Acceptable",
87 407: "Proxy Authentication Required",
88 408: "Request Timeout",
89 409: "Conflict",
90 410: "Gone",
91 411: "Length Required",
92 412: "Precondition Failed",
93 413: "Request Entity Too Large",
94 414: "Request URI Too Long",
95 415: "Unsupported Media Type",
96 416: "Requested Range Not Satisfiable",
97 417: "Expectation Failed",
98 418: "I'm a teapot", # see RFC 2324
99 421: "Misdirected Request", # see RFC 7540
100 422: "Unprocessable Entity",
101 423: "Locked",
102 424: "Failed Dependency",
103 425: "Too Early", # see RFC 8470
104 426: "Upgrade Required",
105 428: "Precondition Required", # see RFC 6585
106 429: "Too Many Requests",
107 431: "Request Header Fields Too Large",
108 449: "Retry With", # proprietary MS extension
109 451: "Unavailable For Legal Reasons",
110 500: "Internal Server Error",
111 501: "Not Implemented",
112 502: "Bad Gateway",
113 503: "Service Unavailable",
114 504: "Gateway Timeout",
115 505: "HTTP Version Not Supported",
116 506: "Variant Also Negotiates", # see RFC 2295
117 507: "Insufficient Storage",
118 508: "Loop Detected", # see RFC 5842
119 510: "Not Extended",
120 511: "Network Authentication Failed",
121 }
122
123
124 class COEP(Enum):
125 """Cross Origin Embedder Policies"""
126
127 UNSAFE_NONE = "unsafe-none"
128 REQUIRE_CORP = "require-corp"
129
130
131 class COOP(Enum):
132 """Cross Origin Opener Policies"""
133
134 UNSAFE_NONE = "unsafe-none"
135 SAME_ORIGIN_ALLOW_POPUPS = "same-origin-allow-popups"
136 SAME_ORIGIN = "same-origin"
137
138
139 def quote_header_value(value: t.Any, allow_token: bool = True) -> str:
140 """Add double quotes around a header value. If the header contains only ASCII token
141 characters, it will be returned unchanged. If the header contains ``"`` or ``\\``
142 characters, they will be escaped with an additional ``\\`` character.
143
144 This is the reverse of :func:`unquote_header_value`.
145
146 :param value: The value to quote. Will be converted to a string.
147 :param allow_token: Disable to quote the value even if it only has token characters.
148
149 .. versionchanged:: 3.0
150 Passing bytes is not supported.
151
152 .. versionchanged:: 3.0
153 The ``extra_chars`` parameter is removed.
154
155 .. versionchanged:: 2.3
156 The value is quoted if it is the empty string.
157
158 .. versionadded:: 0.5
159 """
160 value = str(value)
161
162 if not value:
163 return '""'
164
165 if allow_token:
166 token_chars = _token_chars
167
168 if token_chars.issuperset(value):
169 return value
170
171 value = value.replace("\\", "\\\\").replace('"', '\\"')
172 return f'"{value}"'
173
174
175 def unquote_header_value(value: str) -> str:
176 """Remove double quotes and decode slash-escaped ``"`` and ``\\`` characters in a
177 header value.
178
179 This is the reverse of :func:`quote_header_value`.
180
181 :param value: The header value to unquote.
182
183 .. versionchanged:: 3.0
184 The ``is_filename`` parameter is removed.
185 """
186 if len(value) >= 2 and value[0] == value[-1] == '"':
187 value = value[1:-1]
188 return value.replace("\\\\", "\\").replace('\\"', '"')
189
190 return value
191
192
193 def dump_options_header(header: str | None, options: t.Mapping[str, t.Any]) -> str:
194 """Produce a header value and ``key=value`` parameters separated by semicolons
195 ``;``. For example, the ``Content-Type`` header.
196
197 .. code-block:: python
198
199 dump_options_header("text/html", {"charset": "UTF-8"})
200 'text/html; charset=UTF-8'
201
202 This is the reverse of :func:`parse_options_header`.
203
204 If a value contains non-token characters, it will be quoted.
205
206 If a value is ``None``, the parameter is skipped.
207
208 In some keys for some headers, a UTF-8 value can be encoded using a special
209 ``key*=UTF-8''value`` form, where ``value`` is percent encoded. This function will
210 not produce that format automatically, but if a given key ends with an asterisk
211 ``*``, the value is assumed to have that form and will not be quoted further.
212
213 :param header: The primary header value.
214 :param options: Parameters to encode as ``key=value`` pairs.
215
216 .. versionchanged:: 2.3
217 Keys with ``None`` values are skipped rather than treated as a bare key.
218
219 .. versionchanged:: 2.2.3
220 If a key ends with ``*``, its value will not be quoted.
221 """
222 segments = []
223
224 if header is not None:
225 segments.append(header)
226
227 for key, value in options.items():
228 if value is None:
229 continue
230
231 if key[-1] == "*":
232 segments.append(f"{key}={value}")
233 else:
234 segments.append(f"{key}={quote_header_value(value)}")
235
236 return "; ".join(segments)
237
238
239 def dump_header(iterable: dict[str, t.Any] | t.Iterable[t.Any]) -> str:
240 """Produce a header value from a list of items or ``key=value`` pairs, separated by
241 commas ``,``.
242
243 This is the reverse of :func:`parse_list_header`, :func:`parse_dict_header`, and
244 :func:`parse_set_header`.
245
246 If a value contains non-token characters, it will be quoted.
247
248 If a value is ``None``, the key is output alone.
249
250 In some keys for some headers, a UTF-8 value can be encoded using a special
251 ``key*=UTF-8''value`` form, where ``value`` is percent encoded. This function will
252 not produce that format automatically, but if a given key ends with an asterisk
253 ``*``, the value is assumed to have that form and will not be quoted further.
254
255 .. code-block:: python
256
257 dump_header(["foo", "bar baz"])
258 'foo, "bar baz"'
259
260 dump_header({"foo": "bar baz"})
261 'foo="bar baz"'
262
263 :param iterable: The items to create a header from.
264
265 .. versionchanged:: 3.0
266 The ``allow_token`` parameter is removed.
267
268 .. versionchanged:: 2.2.3
269 If a key ends with ``*``, its value will not be quoted.
270 """
271 if isinstance(iterable, dict):
272 items = []
273
274 for key, value in iterable.items():
275 if value is None:
276 items.append(key)
277 elif key[-1] == "*":
278 items.append(f"{key}={value}")
279 else:
280 items.append(f"{key}={quote_header_value(value)}")
281 else:
282 items = [quote_header_value(x) for x in iterable]
283
284 return ", ".join(items)
285
286
287 def dump_csp_header(header: ds.ContentSecurityPolicy) -> str:
288 """Dump a Content Security Policy header.
289
290 These are structured into policies such as "default-src 'self';
291 script-src 'self'".
292
293 .. versionadded:: 1.0.0
294 Support for Content Security Policy headers was added.
295
296 """
297 return "; ".join(f"{key} {value}" for key, value in header.items())
298
299
300 def parse_list_header(value: str) -> list[str]:
301 """Parse a header value that consists of a list of comma separated items according
302 to `RFC 9110 <https://httpwg.org/specs/rfc9110.html#abnf.extension>`__.
303
304 This extends :func:`urllib.request.parse_http_list` to remove surrounding quotes
305 from values.
306
307 .. code-block:: python
308
309 parse_list_header('token, "quoted value"')
310 ['token', 'quoted value']
311
312 This is the reverse of :func:`dump_header`.
313
314 :param value: The header value to parse.
315 """
316 result = []
317
318 for item in _parse_list_header(value):
319 if len(item) >= 2 and item[0] == item[-1] == '"':
320 item = item[1:-1]
321
322 result.append(item)
323
324 return result
325
326
327 def parse_dict_header(value: str) -> dict[str, str | None]:
328 """Parse a list header using :func:`parse_list_header`, then parse each item as a
329 ``key=value`` pair.
330
331 .. code-block:: python
332
333 parse_dict_header('a=b, c="d, e", f')
334 {"a": "b", "c": "d, e", "f": None}
335
336 This is the reverse of :func:`dump_header`.
337
338 If a key does not have a value, it is ``None``.
339
340 This handles charsets for values as described in
341 `RFC 2231 <https://www.rfc-editor.org/rfc/rfc2231#section-3>`__. Only ASCII, UTF-8,
342 and ISO-8859-1 charsets are accepted, otherwise the value remains quoted.
343
344 :param value: The header value to parse.
345
346 .. versionchanged:: 3.0
347 Passing bytes is not supported.
348
349 .. versionchanged:: 3.0
350 The ``cls`` argument is removed.
351
352 .. versionchanged:: 2.3
353 Added support for ``key*=charset''value`` encoded items.
354
355 .. versionchanged:: 0.9
356 The ``cls`` argument was added.
357 """
358 result: dict[str, str | None] = {}
359
360 for item in parse_list_header(value):
361 key, has_value, value = item.partition("=")
362 key = key.strip()
363
364 if not has_value:
365 result[key] = None
366 continue
367
368 value = value.strip()
369 encoding: str | None = None
370
371 if key[-1] == "*":
372 # key*=charset''value becomes key=value, where value is percent encoded
373 # adapted from parse_options_header, without the continuation handling
374 key = key[:-1]
375 match = _charset_value_re.match(value)
376
377 if match:
378 # If there is a charset marker in the value, split it off.
379 encoding, value = match.groups()
380 encoding = encoding.lower()
381
382 # A safe list of encodings. Modern clients should only send ASCII or UTF-8.
383 # This list will not be extended further. An invalid encoding will leave the
384 # value quoted.
385 if encoding in {"ascii", "us-ascii", "utf-8", "iso-8859-1"}:
386 # invalid bytes are replaced during unquoting
387 value = unquote(value, encoding=encoding)
388
389 if len(value) >= 2 and value[0] == value[-1] == '"':
390 value = value[1:-1]
391
392 result[key] = value
393
394 return result
395
396
397 # https://httpwg.org/specs/rfc9110.html#parameter
398 _parameter_re = re.compile(
399 r"""
400 # don't match multiple empty parts, that causes backtracking
401 \s*;\s* # find the part delimiter
402 (?:
403 ([\w!#$%&'*+\-.^`|~]+) # key, one or more token chars
404 = # equals, with no space on either side
405 ( # value, token or quoted string
406 [\w!#$%&'*+\-.^`|~]+ # one or more token chars
407 |
408 "(?:\\\\|\\"|.)*?" # quoted string, consuming slash escapes
409 )
410 )? # optionally match key=value, to account for empty parts
411 """,
412 re.ASCII | re.VERBOSE,
413 )
414 # https://www.rfc-editor.org/rfc/rfc2231#section-4
415 _charset_value_re = re.compile(
416 r"""
417 ([\w!#$%&*+\-.^`|~]*)' # charset part, could be empty
418 [\w!#$%&*+\-.^`|~]*' # don't care about language part, usually empty
419 ([\w!#$%&'*+\-.^`|~]+) # one or more token chars with percent encoding
420 """,
421 re.ASCII | re.VERBOSE,
422 )
423 # https://www.rfc-editor.org/rfc/rfc2231#section-3
424 _continuation_re = re.compile(r"\*(\d+)$", re.ASCII)
425
426
427 def parse_options_header(value: str | None) -> tuple[str, dict[str, str]]:
428 """Parse a header that consists of a value with ``key=value`` parameters separated
429 by semicolons ``;``. For example, the ``Content-Type`` header.
430
431 .. code-block:: python
432
433 parse_options_header("text/html; charset=UTF-8")
434 ('text/html', {'charset': 'UTF-8'})
435
436 parse_options_header("")
437 ("", {})
438
439 This is the reverse of :func:`dump_options_header`.
440
441 This parses valid parameter parts as described in
442 `RFC 9110 <https://httpwg.org/specs/rfc9110.html#parameter>`__. Invalid parts are
443 skipped.
444
445 This handles continuations and charsets as described in
446 `RFC 2231 <https://www.rfc-editor.org/rfc/rfc2231#section-3>`__, although not as
447 strictly as the RFC. Only ASCII, UTF-8, and ISO-8859-1 charsets are accepted,
448 otherwise the value remains quoted.
449
450 Clients may not be consistent in how they handle a quote character within a quoted
451 value. The `HTML Standard <https://html.spec.whatwg.org/#multipart-form-data>`__
452 replaces it with ``%22`` in multipart form data.
453 `RFC 9110 <https://httpwg.org/specs/rfc9110.html#quoted.strings>`__ uses backslash
454 escapes in HTTP headers. Both are decoded to the ``"`` character.
455
456 Clients may not be consistent in how they handle non-ASCII characters. HTML
457 documents must declare ``<meta charset=UTF-8>``, otherwise browsers may replace with
458 HTML character references, which can be decoded using :func:`html.unescape`.
459
460 :param value: The header value to parse.
461 :return: ``(value, options)``, where ``options`` is a dict
462
463 .. versionchanged:: 2.3
464 Invalid parts, such as keys with no value, quoted keys, and incorrectly quoted
465 values, are discarded instead of treating as ``None``.
466
467 .. versionchanged:: 2.3
468 Only ASCII, UTF-8, and ISO-8859-1 are accepted for charset values.
469
470 .. versionchanged:: 2.3
471 Escaped quotes in quoted values, like ``%22`` and ``\\"``, are handled.
472
473 .. versionchanged:: 2.2
474 Option names are always converted to lowercase.
475
476 .. versionchanged:: 2.2
477 The ``multiple`` parameter was removed.
478
479 .. versionchanged:: 0.15
480 :rfc:`2231` parameter continuations are handled.
481
482 .. versionadded:: 0.5
483 """
484 if value is None:
485 return "", {}
486
487 value, _, rest = value.partition(";")
488 value = value.strip()
489 rest = rest.strip()
490
491 if not value or not rest:
492 # empty (invalid) value, or value without options
493 return value, {}
494
495 rest = f";{rest}"
496 options: dict[str, str] = {}
497 encoding: str | None = None
498 continued_encoding: str | None = None
499
500 for pk, pv in _parameter_re.findall(rest):
501 if not pk:
502 # empty or invalid part
503 continue
504
505 pk = pk.lower()
506
507 if pk[-1] == "*":
508 # key*=charset''value becomes key=value, where value is percent encoded
509 pk = pk[:-1]
510 match = _charset_value_re.match(pv)
511
512 if match:
513 # If there is a valid charset marker in the value, split it off.
514 encoding, pv = match.groups()
515 # This might be the empty string, handled next.
516 encoding = encoding.lower()
517
518 # No charset marker, or marker with empty charset value.
519 if not encoding:
520 encoding = continued_encoding
521
522 # A safe list of encodings. Modern clients should only send ASCII or UTF-8.
523 # This list will not be extended further. An invalid encoding will leave the
524 # value quoted.
525 if encoding in {"ascii", "us-ascii", "utf-8", "iso-8859-1"}:
526 # Continuation parts don't require their own charset marker. This is
527 # looser than the RFC, it will persist across different keys and allows
528 # changing the charset during a continuation. But this implementation is
529 # much simpler than tracking the full state.
530 continued_encoding = encoding
531 # invalid bytes are replaced during unquoting
532 pv = unquote(pv, encoding=encoding)
533
534 # Remove quotes. At this point the value cannot be empty or a single quote.
535 if pv[0] == pv[-1] == '"':
536 # HTTP headers use slash, multipart form data uses percent
537 pv = pv[1:-1].replace("\\\\", "\\").replace('\\"', '"').replace("%22", '"')
538
539 match = _continuation_re.search(pk)
540
541 if match:
542 # key*0=a; key*1=b becomes key=ab
543 pk = pk[: match.start()]
544 options[pk] = options.get(pk, "") + pv
545 else:
546 options[pk] = pv
547
548 return value, options
549
550
551 _q_value_re = re.compile(r"-?\d+(\.\d+)?", re.ASCII)
552 _TAnyAccept = t.TypeVar("_TAnyAccept", bound="ds.Accept")
553
554
555 @t.overload
556 def parse_accept_header(value: str | None) -> ds.Accept:
557 ...
558
559
560 @t.overload
561 def parse_accept_header(value: str | None, cls: type[_TAnyAccept]) -> _TAnyAccept:
562 ...
563
564
565 def parse_accept_header(
566 value: str | None, cls: type[_TAnyAccept] | None = None
567 ) -> _TAnyAccept:
568 """Parse an ``Accept`` header according to
569 `RFC 9110 <https://httpwg.org/specs/rfc9110.html#field.accept>`__.
570
571 Returns an :class:`.Accept` instance, which can sort and inspect items based on
572 their quality parameter. When parsing ``Accept-Charset``, ``Accept-Encoding``, or
573 ``Accept-Language``, pass the appropriate :class:`.Accept` subclass.
574
575 :param value: The header value to parse.
576 :param cls: The :class:`.Accept` class to wrap the result in.
577 :return: An instance of ``cls``.
578
579 .. versionchanged:: 2.3
580 Parse according to RFC 9110. Items with invalid ``q`` values are skipped.
581 """
582 if cls is None:
583 cls = t.cast(t.Type[_TAnyAccept], ds.Accept)
584
585 if not value:
586 return cls(None)
587
588 result = []
589
590 for item in parse_list_header(value):
591 item, options = parse_options_header(item)
592
593 if "q" in options:
594 # pop q, remaining options are reconstructed
595 q_str = options.pop("q").strip()
596
597 if _q_value_re.fullmatch(q_str) is None:
598 # ignore an invalid q
599 continue
600
601 q = float(q_str)
602
603 if q < 0 or q > 1:
604 # ignore an invalid q
605 continue
606 else:
607 q = 1
608
609 if options:
610 # reconstruct the media type with any options
611 item = dump_options_header(item, options)
612
613 result.append((item, q))
614
615 return cls(result)
616
617
618 _TAnyCC = t.TypeVar("_TAnyCC", bound="ds.cache_control._CacheControl")
619 _t_cc_update = t.Optional[t.Callable[[_TAnyCC], None]]
620
621
622 @t.overload
623 def parse_cache_control_header(
624 value: str | None, on_update: _t_cc_update, cls: None = None
625 ) -> ds.RequestCacheControl:
626 ...
627
628
629 @t.overload
630 def parse_cache_control_header(
631 value: str | None, on_update: _t_cc_update, cls: type[_TAnyCC]
632 ) -> _TAnyCC:
633 ...
634
635
636 def parse_cache_control_header(
637 value: str | None,
638 on_update: _t_cc_update = None,
639 cls: type[_TAnyCC] | None = None,
640 ) -> _TAnyCC:
641 """Parse a cache control header. The RFC differs between response and
642 request cache control, this method does not. It's your responsibility
643 to not use the wrong control statements.
644
645 .. versionadded:: 0.5
646 The `cls` was added. If not specified an immutable
647 :class:`~werkzeug.datastructures.RequestCacheControl` is returned.
648
649 :param value: a cache control header to be parsed.
650 :param on_update: an optional callable that is called every time a value
651 on the :class:`~werkzeug.datastructures.CacheControl`
652 object is changed.
653 :param cls: the class for the returned object. By default
654 :class:`~werkzeug.datastructures.RequestCacheControl` is used.
655 :return: a `cls` object.
656 """
657 if cls is None:
658 cls = t.cast(t.Type[_TAnyCC], ds.RequestCacheControl)
659
660 if not value:
661 return cls((), on_update)
662
663 return cls(parse_dict_header(value), on_update)
664
665
666 _TAnyCSP = t.TypeVar("_TAnyCSP", bound="ds.ContentSecurityPolicy")
667 _t_csp_update = t.Optional[t.Callable[[_TAnyCSP], None]]
668
669
670 @t.overload
671 def parse_csp_header(
672 value: str | None, on_update: _t_csp_update, cls: None = None
673 ) -> ds.ContentSecurityPolicy:
674 ...
675
676
677 @t.overload
678 def parse_csp_header(
679 value: str | None, on_update: _t_csp_update, cls: type[_TAnyCSP]
680 ) -> _TAnyCSP:
681 ...
682
683
684 def parse_csp_header(
685 value: str | None,
686 on_update: _t_csp_update = None,
687 cls: type[_TAnyCSP] | None = None,
688 ) -> _TAnyCSP:
689 """Parse a Content Security Policy header.
690
691 .. versionadded:: 1.0.0
692 Support for Content Security Policy headers was added.
693
694 :param value: a csp header to be parsed.
695 :param on_update: an optional callable that is called every time a value
696 on the object is changed.
697 :param cls: the class for the returned object. By default
698 :class:`~werkzeug.datastructures.ContentSecurityPolicy` is used.
699 :return: a `cls` object.
700 """
701 if cls is None:
702 cls = t.cast(t.Type[_TAnyCSP], ds.ContentSecurityPolicy)
703
704 if value is None:
705 return cls((), on_update)
706
707 items = []
708
709 for policy in value.split(";"):
710 policy = policy.strip()
711
712 # Ignore badly formatted policies (no space)
713 if " " in policy:
714 directive, value = policy.strip().split(" ", 1)
715 items.append((directive.strip(), value.strip()))
716
717 return cls(items, on_update)
718
719
720 def parse_set_header(
721 value: str | None,
722 on_update: t.Callable[[ds.HeaderSet], None] | None = None,
723 ) -> ds.HeaderSet:
724 """Parse a set-like header and return a
725 :class:`~werkzeug.datastructures.HeaderSet` object:
726
727 >>> hs = parse_set_header('token, "quoted value"')
728
729 The return value is an object that treats the items case-insensitively
730 and keeps the order of the items:
731
732 >>> 'TOKEN' in hs
733 True
734 >>> hs.index('quoted value')
735 1
736 >>> hs
737 HeaderSet(['token', 'quoted value'])
738
739 To create a header from the :class:`HeaderSet` again, use the
740 :func:`dump_header` function.
741
742 :param value: a set header to be parsed.
743 :param on_update: an optional callable that is called every time a
744 value on the :class:`~werkzeug.datastructures.HeaderSet`
745 object is changed.
746 :return: a :class:`~werkzeug.datastructures.HeaderSet`
747 """
748 if not value:
749 return ds.HeaderSet(None, on_update)
750 return ds.HeaderSet(parse_list_header(value), on_update)
751
752
753 def parse_if_range_header(value: str | None) -> ds.IfRange:
754 """Parses an if-range header which can be an etag or a date. Returns
755 a :class:`~werkzeug.datastructures.IfRange` object.
756
757 .. versionchanged:: 2.0
758 If the value represents a datetime, it is timezone-aware.
759
760 .. versionadded:: 0.7
761 """
762 if not value:
763 return ds.IfRange()
764 date = parse_date(value)
765 if date is not None:
766 return ds.IfRange(date=date)
767 # drop weakness information
768 return ds.IfRange(unquote_etag(value)[0])
769
770
771 def parse_range_header(
772 value: str | None, make_inclusive: bool = True
773 ) -> ds.Range | None:
774 """Parses a range header into a :class:`~werkzeug.datastructures.Range`
775 object. If the header is missing or malformed `None` is returned.
776 `ranges` is a list of ``(start, stop)`` tuples where the ranges are
777 non-inclusive.
778
779 .. versionadded:: 0.7
780 """
781 if not value or "=" not in value:
782 return None
783
784 ranges = []
785 last_end = 0
786 units, rng = value.split("=", 1)
787 units = units.strip().lower()
788
789 for item in rng.split(","):
790 item = item.strip()
791 if "-" not in item:
792 return None
793 if item.startswith("-"):
794 if last_end < 0:
795 return None
796 try:
797 begin = _plain_int(item)
798 except ValueError:
799 return None
800 end = None
801 last_end = -1
802 elif "-" in item:
803 begin_str, end_str = item.split("-", 1)
804 begin_str = begin_str.strip()
805 end_str = end_str.strip()
806
807 try:
808 begin = _plain_int(begin_str)
809 except ValueError:
810 return None
811
812 if begin < last_end or last_end < 0:
813 return None
814 if end_str:
815 try:
816 end = _plain_int(end_str) + 1
817 except ValueError:
818 return None
819
820 if begin >= end:
821 return None
822 else:
823 end = None
824 last_end = end if end is not None else -1
825 ranges.append((begin, end))
826
827 return ds.Range(units, ranges)
828
829
830 def parse_content_range_header(
831 value: str | None,
832 on_update: t.Callable[[ds.ContentRange], None] | None = None,
833 ) -> ds.ContentRange | None:
834 """Parses a range header into a
835 :class:`~werkzeug.datastructures.ContentRange` object or `None` if
836 parsing is not possible.
837
838 .. versionadded:: 0.7
839
840 :param value: a content range header to be parsed.
841 :param on_update: an optional callable that is called every time a value
842 on the :class:`~werkzeug.datastructures.ContentRange`
843 object is changed.
844 """
845 if value is None:
846 return None
847 try:
848 units, rangedef = (value or "").strip().split(None, 1)
849 except ValueError:
850 return None
851
852 if "/" not in rangedef:
853 return None
854 rng, length_str = rangedef.split("/", 1)
855 if length_str == "*":
856 length = None
857 else:
858 try:
859 length = _plain_int(length_str)
860 except ValueError:
861 return None
862
863 if rng == "*":
864 if not is_byte_range_valid(None, None, length):
865 return None
866
867 return ds.ContentRange(units, None, None, length, on_update=on_update)
868 elif "-" not in rng:
869 return None
870
871 start_str, stop_str = rng.split("-", 1)
872 try:
873 start = _plain_int(start_str)
874 stop = _plain_int(stop_str) + 1
875 except ValueError:
876 return None
877
878 if is_byte_range_valid(start, stop, length):
879 return ds.ContentRange(units, start, stop, length, on_update=on_update)
880
881 return None
882
883
884 def quote_etag(etag: str, weak: bool = False) -> str:
885 """Quote an etag.
886
887 :param etag: the etag to quote.
888 :param weak: set to `True` to tag it "weak".
889 """
890 if '"' in etag:
891 raise ValueError("invalid etag")
892 etag = f'"{etag}"'
893 if weak:
894 etag = f"W/{etag}"
895 return etag
896
897
898 def unquote_etag(
899 etag: str | None,
900 ) -> tuple[str, bool] | tuple[None, None]:
901 """Unquote a single etag:
902
903 >>> unquote_etag('W/"bar"')
904 ('bar', True)
905 >>> unquote_etag('"bar"')
906 ('bar', False)
907
908 :param etag: the etag identifier to unquote.
909 :return: a ``(etag, weak)`` tuple.
910 """
911 if not etag:
912 return None, None
913 etag = etag.strip()
914 weak = False
915 if etag.startswith(("W/", "w/")):
916 weak = True
917 etag = etag[2:]
918 if etag[:1] == etag[-1:] == '"':
919 etag = etag[1:-1]
920 return etag, weak
921
922
923 def parse_etags(value: str | None) -> ds.ETags:
924 """Parse an etag header.
925
926 :param value: the tag header to parse
927 :return: an :class:`~werkzeug.datastructures.ETags` object.
928 """
929 if not value:
930 return ds.ETags()
931 strong = []
932 weak = []
933 end = len(value)
934 pos = 0
935 while pos < end:
936 match = _etag_re.match(value, pos)
937 if match is None:
938 break
939 is_weak, quoted, raw = match.groups()
940 if raw == "*":
941 return ds.ETags(star_tag=True)
942 elif quoted:
943 raw = quoted
944 if is_weak:
945 weak.append(raw)
946 else:
947 strong.append(raw)
948 pos = match.end()
949 return ds.ETags(strong, weak)
950
951
952 def generate_etag(data: bytes) -> str:
953 """Generate an etag for some data.
954
955 .. versionchanged:: 2.0
956 Use SHA-1. MD5 may not be available in some environments.
957 """
958 return sha1(data).hexdigest()
959
960
961 def parse_date(value: str | None) -> datetime | None:
962 """Parse an :rfc:`2822` date into a timezone-aware
963 :class:`datetime.datetime` object, or ``None`` if parsing fails.
964
965 This is a wrapper for :func:`email.utils.parsedate_to_datetime`. It
966 returns ``None`` if parsing fails instead of raising an exception,
967 and always returns a timezone-aware datetime object. If the string
968 doesn't have timezone information, it is assumed to be UTC.
969
970 :param value: A string with a supported date format.
971
972 .. versionchanged:: 2.0
973 Return a timezone-aware datetime object. Use
974 ``email.utils.parsedate_to_datetime``.
975 """
976 if value is None:
977 return None
978
979 try:
980 dt = email.utils.parsedate_to_datetime(value)
981 except (TypeError, ValueError):
982 return None
983
984 if dt.tzinfo is None:
985 return dt.replace(tzinfo=timezone.utc)
986
987 return dt
988
989
990 def http_date(
991 timestamp: datetime | date | int | float | struct_time | None = None,
992 ) -> str:
993 """Format a datetime object or timestamp into an :rfc:`2822` date
994 string.
995
996 This is a wrapper for :func:`email.utils.format_datetime`. It
997 assumes naive datetime objects are in UTC instead of raising an
998 exception.
999
1000 :param timestamp: The datetime or timestamp to format. Defaults to
1001 the current time.
1002
1003 .. versionchanged:: 2.0
1004 Use ``email.utils.format_datetime``. Accept ``date`` objects.
1005 """
1006 if isinstance(timestamp, date):
1007 if not isinstance(timestamp, datetime):
1008 # Assume plain date is midnight UTC.
1009 timestamp = datetime.combine(timestamp, time(), tzinfo=timezone.utc)
1010 else:
1011 # Ensure datetime is timezone-aware.
1012 timestamp = _dt_as_utc(timestamp)
1013
1014 return email.utils.format_datetime(timestamp, usegmt=True)
1015
1016 if isinstance(timestamp, struct_time):
1017 timestamp = mktime(timestamp)
1018
1019 return email.utils.formatdate(timestamp, usegmt=True)
1020
1021
1022 def parse_age(value: str | None = None) -> timedelta | None:
1023 """Parses a base-10 integer count of seconds into a timedelta.
1024
1025 If parsing fails, the return value is `None`.
1026
1027 :param value: a string consisting of an integer represented in base-10
1028 :return: a :class:`datetime.timedelta` object or `None`.
1029 """
1030 if not value:
1031 return None
1032 try:
1033 seconds = int(value)
1034 except ValueError:
1035 return None
1036 if seconds < 0:
1037 return None
1038 try:
1039 return timedelta(seconds=seconds)
1040 except OverflowError:
1041 return None
1042
1043
1044 def dump_age(age: timedelta | int | None = None) -> str | None:
1045 """Formats the duration as a base-10 integer.
1046
1047 :param age: should be an integer number of seconds,
1048 a :class:`datetime.timedelta` object, or,
1049 if the age is unknown, `None` (default).
1050 """
1051 if age is None:
1052 return None
1053 if isinstance(age, timedelta):
1054 age = int(age.total_seconds())
1055 else:
1056 age = int(age)
1057
1058 if age < 0:
1059 raise ValueError("age cannot be negative")
1060
1061 return str(age)
1062
1063
1064 def is_resource_modified(
1065 environ: WSGIEnvironment,
1066 etag: str | None = None,
1067 data: bytes | None = None,
1068 last_modified: datetime | str | None = None,
1069 ignore_if_range: bool = True,
1070 ) -> bool:
1071 """Convenience method for conditional requests.
1072
1073 :param environ: the WSGI environment of the request to be checked.
1074 :param etag: the etag for the response for comparison.
1075 :param data: or alternatively the data of the response to automatically
1076 generate an etag using :func:`generate_etag`.
1077 :param last_modified: an optional date of the last modification.
1078 :param ignore_if_range: If `False`, `If-Range` header will be taken into
1079 account.
1080 :return: `True` if the resource was modified, otherwise `False`.
1081
1082 .. versionchanged:: 2.0
1083 SHA-1 is used to generate an etag value for the data. MD5 may
1084 not be available in some environments.
1085
1086 .. versionchanged:: 1.0.0
1087 The check is run for methods other than ``GET`` and ``HEAD``.
1088 """
1089 return _sansio_http.is_resource_modified(
1090 http_range=environ.get("HTTP_RANGE"),
1091 http_if_range=environ.get("HTTP_IF_RANGE"),
1092 http_if_modified_since=environ.get("HTTP_IF_MODIFIED_SINCE"),
1093 http_if_none_match=environ.get("HTTP_IF_NONE_MATCH"),
1094 http_if_match=environ.get("HTTP_IF_MATCH"),
1095 etag=etag,
1096 data=data,
1097 last_modified=last_modified,
1098 ignore_if_range=ignore_if_range,
1099 )
1100
1101
1102 def remove_entity_headers(
1103 headers: ds.Headers | list[tuple[str, str]],
1104 allowed: t.Iterable[str] = ("expires", "content-location"),
1105 ) -> None:
1106 """Remove all entity headers from a list or :class:`Headers` object. This
1107 operation works in-place. `Expires` and `Content-Location` headers are
1108 by default not removed. The reason for this is :rfc:`2616` section
1109 10.3.5 which specifies some entity headers that should be sent.
1110
1111 .. versionchanged:: 0.5
1112 added `allowed` parameter.
1113
1114 :param headers: a list or :class:`Headers` object.
1115 :param allowed: a list of headers that should still be allowed even though
1116 they are entity headers.
1117 """
1118 allowed = {x.lower() for x in allowed}
1119 headers[:] = [
1120 (key, value)
1121 for key, value in headers
1122 if not is_entity_header(key) or key.lower() in allowed
1123 ]
1124
1125
1126 def remove_hop_by_hop_headers(headers: ds.Headers | list[tuple[str, str]]) -> None:
1127 """Remove all HTTP/1.1 "Hop-by-Hop" headers from a list or
1128 :class:`Headers` object. This operation works in-place.
1129
1130 .. versionadded:: 0.5
1131
1132 :param headers: a list or :class:`Headers` object.
1133 """
1134 headers[:] = [
1135 (key, value) for key, value in headers if not is_hop_by_hop_header(key)
1136 ]
1137
1138
1139 def is_entity_header(header: str) -> bool:
1140 """Check if a header is an entity header.
1141
1142 .. versionadded:: 0.5
1143
1144 :param header: the header to test.
1145 :return: `True` if it's an entity header, `False` otherwise.
1146 """
1147 return header.lower() in _entity_headers
1148
1149
1150 def is_hop_by_hop_header(header: str) -> bool:
1151 """Check if a header is an HTTP/1.1 "Hop-by-Hop" header.
1152
1153 .. versionadded:: 0.5
1154
1155 :param header: the header to test.
1156 :return: `True` if it's an HTTP/1.1 "Hop-by-Hop" header, `False` otherwise.
1157 """
1158 return header.lower() in _hop_by_hop_headers
1159
1160
1161 def parse_cookie(
1162 header: WSGIEnvironment | str | None,
1163 cls: type[ds.MultiDict] | None = None,
1164 ) -> ds.MultiDict[str, str]:
1165 """Parse a cookie from a string or WSGI environ.
1166
1167 The same key can be provided multiple times, the values are stored
1168 in-order. The default :class:`MultiDict` will have the first value
1169 first, and all values can be retrieved with
1170 :meth:`MultiDict.getlist`.
1171
1172 :param header: The cookie header as a string, or a WSGI environ dict
1173 with a ``HTTP_COOKIE`` key.
1174 :param cls: A dict-like class to store the parsed cookies in.
1175 Defaults to :class:`MultiDict`.
1176
1177 .. versionchanged:: 3.0
1178 Passing bytes, and the ``charset`` and ``errors`` parameters, were removed.
1179
1180 .. versionchanged:: 1.0
1181 Returns a :class:`MultiDict` instead of a ``TypeConversionDict``.
1182
1183 .. versionchanged:: 0.5
1184 Returns a :class:`TypeConversionDict` instead of a regular dict. The ``cls``
1185 parameter was added.
1186 """
1187 if isinstance(header, dict):
1188 cookie = header.get("HTTP_COOKIE")
1189 else:
1190 cookie = header
1191
1192 if cookie:
1193 cookie = cookie.encode("latin1").decode()
1194
1195 return _sansio_http.parse_cookie(cookie=cookie, cls=cls)
1196
1197
1198 _cookie_no_quote_re = re.compile(r"[\w!#$%&'()*+\-./:<=>?@\[\]^`{|}~]*", re.A)
1199 _cookie_slash_re = re.compile(rb"[\x00-\x19\",;\\\x7f-\xff]", re.A)
1200 _cookie_slash_map = {b'"': b'\\"', b"\\": b"\\\\"}
1201 _cookie_slash_map.update(
1202 (v.to_bytes(1, "big"), b"\\%03o" % v)
1203 for v in [*range(0x20), *b",;", *range(0x7F, 256)]
1204 )
1205
1206
1207 def dump_cookie(
1208 key: str,
1209 value: str = "",
1210 max_age: timedelta | int | None = None,
1211 expires: str | datetime | int | float | None = None,
1212 path: str | None = "/",
1213 domain: str | None = None,
1214 secure: bool = False,
1215 httponly: bool = False,
1216 sync_expires: bool = True,
1217 max_size: int = 4093,
1218 samesite: str | None = None,
1219 ) -> str:
1220 """Create a Set-Cookie header without the ``Set-Cookie`` prefix.
1221
1222 The return value is usually restricted to ascii as the vast majority
1223 of values are properly escaped, but that is no guarantee. It's
1224 tunneled through latin1 as required by :pep:`3333`.
1225
1226 The return value is not ASCII safe if the key contains unicode
1227 characters. This is technically against the specification but
1228 happens in the wild. It's strongly recommended to not use
1229 non-ASCII values for the keys.
1230
1231 :param max_age: should be a number of seconds, or `None` (default) if
1232 the cookie should last only as long as the client's
1233 browser session. Additionally `timedelta` objects
1234 are accepted, too.
1235 :param expires: should be a `datetime` object or unix timestamp.
1236 :param path: limits the cookie to a given path, per default it will
1237 span the whole domain.
1238 :param domain: Use this if you want to set a cross-domain cookie. For
1239 example, ``domain="example.com"`` will set a cookie
1240 that is readable by the domain ``www.example.com``,
1241 ``foo.example.com`` etc. Otherwise, a cookie will only
1242 be readable by the domain that set it.
1243 :param secure: The cookie will only be available via HTTPS
1244 :param httponly: disallow JavaScript to access the cookie. This is an
1245 extension to the cookie standard and probably not
1246 supported by all browsers.
1247 :param charset: the encoding for string values.
1248 :param sync_expires: automatically set expires if max_age is defined
1249 but expires not.
1250 :param max_size: Warn if the final header value exceeds this size. The
1251 default, 4093, should be safely `supported by most browsers
1252 <cookie_>`_. Set to 0 to disable this check.
1253 :param samesite: Limits the scope of the cookie such that it will
1254 only be attached to requests if those requests are same-site.
1255
1256 .. _`cookie`: http://browsercookielimits.squawky.net/
1257
1258 .. versionchanged:: 3.0
1259 Passing bytes, and the ``charset`` parameter, were removed.
1260
1261 .. versionchanged:: 2.3.3
1262 The ``path`` parameter is ``/`` by default.
1263
1264 .. versionchanged:: 2.3.1
1265 The value allows more characters without quoting.
1266
1267 .. versionchanged:: 2.3
1268 ``localhost`` and other names without a dot are allowed for the domain. A
1269 leading dot is ignored.
1270
1271 .. versionchanged:: 2.3
1272 The ``path`` parameter is ``None`` by default.
1273
1274 .. versionchanged:: 1.0.0
1275 The string ``'None'`` is accepted for ``samesite``.
1276 """
1277 if path is not None:
1278 # safe = https://url.spec.whatwg.org/#url-path-segment-string
1279 # as well as percent for things that are already quoted
1280 # excluding semicolon since it's part of the header syntax
1281 path = quote(path, safe="%!$&'()*+,/:=@")
1282
1283 if domain:
1284 domain = domain.partition(":")[0].lstrip(".").encode("idna").decode("ascii")
1285
1286 if isinstance(max_age, timedelta):
1287 max_age = int(max_age.total_seconds())
1288
1289 if expires is not None:
1290 if not isinstance(expires, str):
1291 expires = http_date(expires)
1292 elif max_age is not None and sync_expires:
1293 expires = http_date(datetime.now(tz=timezone.utc).timestamp() + max_age)
1294
1295 if samesite is not None:
1296 samesite = samesite.title()
1297
1298 if samesite not in {"Strict", "Lax", "None"}:
1299 raise ValueError("SameSite must be 'Strict', 'Lax', or 'None'.")
1300
1301 # Quote value if it contains characters not allowed by RFC 6265. Slash-escape with
1302 # three octal digits, which matches http.cookies, although the RFC suggests base64.
1303 if not _cookie_no_quote_re.fullmatch(value):
1304 # Work with bytes here, since a UTF-8 character could be multiple bytes.
1305 value = _cookie_slash_re.sub(
1306 lambda m: _cookie_slash_map[m.group()], value.encode()
1307 ).decode("ascii")
1308 value = f'"{value}"'
1309
1310 # Send a non-ASCII key as mojibake. Everything else should already be ASCII.
1311 # TODO Remove encoding dance, it seems like clients accept UTF-8 keys
1312 buf = [f"{key.encode().decode('latin1')}={value}"]
1313
1314 for k, v in (
1315 ("Domain", domain),
1316 ("Expires", expires),
1317 ("Max-Age", max_age),
1318 ("Secure", secure),
1319 ("HttpOnly", httponly),
1320 ("Path", path),
1321 ("SameSite", samesite),
1322 ):
1323 if v is None or v is False:
1324 continue
1325
1326 if v is True:
1327 buf.append(k)
1328 continue
1329
1330 buf.append(f"{k}={v}")
1331
1332 rv = "; ".join(buf)
1333
1334 # Warn if the final value of the cookie is larger than the limit. If the cookie is
1335 # too large, then it may be silently ignored by the browser, which can be quite hard
1336 # to debug.
1337 cookie_size = len(rv)
1338
1339 if max_size and cookie_size > max_size:
1340 value_size = len(value)
1341 warnings.warn(
1342 f"The '{key}' cookie is too large: the value was {value_size} bytes but the"
1343 f" header required {cookie_size - value_size} extra bytes. The final size"
1344 f" was {cookie_size} bytes but the limit is {max_size} bytes. Browsers may"
1345 " silently ignore cookies larger than this.",
1346 stacklevel=2,
1347 )
1348
1349 return rv
1350
1351
1352 def is_byte_range_valid(
1353 start: int | None, stop: int | None, length: int | None
1354 ) -> bool:
1355 """Checks if a given byte content range is valid for the given length.
1356
1357 .. versionadded:: 0.7
1358 """
1359 if (start is None) != (stop is None):
1360 return False
1361 elif start is None:
1362 return length is None or length >= 0
1363 elif length is None:
1364 return 0 <= start < stop # type: ignore
1365 elif start >= stop: # type: ignore
1366 return False
1367 return 0 <= start < length
1368
1369
1370 # circular dependencies
1371 from . import datastructures as ds
1372 from .sansio import http as _sansio_http