]>
jfr.im git - dlqueue.git/blob - venv/lib/python3.11/site-packages/werkzeug/routing/matcher.py
1 from __future__
import annotations
5 from dataclasses
import dataclass
6 from dataclasses
import field
8 from .converters
import ValidationError
9 from .exceptions
import NoMatch
10 from .exceptions
import RequestAliasRedirect
11 from .exceptions
import RequestPath
12 from .rules
import Rule
13 from .rules
import RulePart
16 class SlashRequired(Exception):
22 """A representation of a rule state.
24 This includes the *rules* that correspond to the state and the
25 possible *static* and *dynamic* transitions to the next state.
28 dynamic
: list[tuple[RulePart
, State
]] = field(default_factory
=list)
29 rules
: list[Rule
] = field(default_factory
=list)
30 static
: dict[str, State
] = field(default_factory
=dict)
33 class StateMachineMatcher
:
34 def __init__(self
, merge_slashes
: bool) -> None:
36 self
.merge_slashes
= merge_slashes
38 def add(self
, rule
: Rule
) -> None:
40 for part
in rule
._parts
:
42 state
.static
.setdefault(part
.content
, State())
43 state
= state
.static
[part
.content
]
45 for test_part
, new_state
in state
.dynamic
:
51 state
.dynamic
.append((part
, new_state
))
53 state
.rules
.append(rule
)
55 def update(self
) -> None:
56 # For every state the dynamic transitions should be sorted by
57 # the weight of the transition
60 def _update_state(state
: State
) -> None:
61 state
.dynamic
.sort(key
=lambda entry
: entry
[0].weight
)
62 for new_state
in state
.static
.values():
63 _update_state(new_state
)
64 for _
, new_state
in state
.dynamic
:
65 _update_state(new_state
)
70 self
, domain
: str, path
: str, method
: str, websocket
: bool
71 ) -> tuple[Rule
, t
.MutableMapping
[str, t
.Any
]]:
72 # To match to a rule we need to start at the root state and
73 # try to follow the transitions until we find a match, or find
74 # there is no transition to follow.
76 have_match_for
= set()
77 websocket_mismatch
= False
80 state
: State
, parts
: list[str], values
: list[str]
81 ) -> tuple[Rule
, list[str]] |
None:
82 # This function is meant to be called recursively, and will attempt
83 # to match the head part to the state's transitions.
84 nonlocal have_match_for
, websocket_mismatch
86 # The base case is when all parts have been matched via
87 # transitions. Hence if there is a rule with methods &
88 # websocket that work return it and the dynamic values
91 for rule
in state
.rules
:
92 if rule
.methods
is not None and method
not in rule
.methods
:
93 have_match_for
.update(rule
.methods
)
94 elif rule
.websocket
!= websocket
:
95 websocket_mismatch
= True
99 # Test if there is a match with this path with a
100 # trailing slash, if so raise an exception to report
101 # that matching is possible with an additional slash
102 if "" in state
.static
:
103 for rule
in state
.static
[""].rules
:
104 if websocket
== rule
.websocket
and (
105 rule
.methods
is None or method
in rule
.methods
107 if rule
.strict_slashes
:
108 raise SlashRequired()
114 # To match this part try the static transitions first
115 if part
in state
.static
:
116 rv
= _match(state
.static
[part
], parts
[1:], values
)
119 # No match via the static transitions, so try the dynamic
121 for test_part
, new_state
in state
.dynamic
:
123 remaining
= parts
[1:]
124 # A final part indicates a transition that always
125 # consumes the remaining parts i.e. transitions to a
128 target
= "/".join(parts
)
130 match
= re
.compile(test_part
.content
).match(target
)
131 if match
is not None:
132 if test_part
.suffixed
:
133 # If a part_isolating=False part has a slash suffix, remove the
134 # suffix from the match and check for the slash redirect next.
135 suffix
= match
.groups()[-1]
139 converter_groups
= sorted(
140 match
.groupdict().items(), key
=lambda entry
: entry
[0]
144 for key
, value
in converter_groups
145 if key
[:11] == "__werkzeug_"
147 rv
= _match(new_state
, remaining
, values
+ groups
)
151 # If there is no match and the only part left is a
152 # trailing slash ("") consider rules that aren't
153 # strict-slashes as these should match if there is a final
156 for rule
in state
.rules
:
157 if rule
.strict_slashes
:
159 if rule
.methods
is not None and method
not in rule
.methods
:
160 have_match_for
.update(rule
.methods
)
161 elif rule
.websocket
!= websocket
:
162 websocket_mismatch
= True
169 rv
= _match(self
._root
, [domain
, *path
.split("/")], [])
170 except SlashRequired
:
171 raise RequestPath(f
"{path}/") from None
173 if self
.merge_slashes
and rv
is None:
174 # Try to match again, but with slashes merged
175 path
= re
.sub("/{2,}?", "/", path
)
177 rv
= _match(self
._root
, [domain
, *path
.split("/")], [])
178 except SlashRequired
:
179 raise RequestPath(f
"{path}/") from None
181 raise NoMatch(have_match_for
, websocket_mismatch
)
183 raise RequestPath(f
"{path}")
188 for name
, value
in zip(rule
._converters
.keys(), values
):
190 value
= rule
._converters
[name
].to_python(value
)
191 except ValidationError
:
192 raise NoMatch(have_match_for
, websocket_mismatch
) from None
193 result
[str(name
)] = value
195 result
.update(rule
.defaults
)
197 if rule
.alias
and rule
.map.redirect_defaults
:
198 raise RequestAliasRedirect(result
, rule
.endpoint
)
202 raise NoMatch(have_match_for
, websocket_mismatch
)