]>
Commit | Line | Data |
---|---|---|
e0df8241 JR |
1 | import time |
2 | import typing | |
3 | import typing as _t | |
4 | from datetime import datetime | |
5 | from datetime import timezone | |
6 | ||
7 | from .encoding import base64_decode | |
8 | from .encoding import base64_encode | |
9 | from .encoding import bytes_to_int | |
10 | from .encoding import int_to_bytes | |
11 | from .encoding import want_bytes | |
12 | from .exc import BadSignature | |
13 | from .exc import BadTimeSignature | |
14 | from .exc import SignatureExpired | |
15 | from .serializer import Serializer | |
16 | from .signer import Signer | |
17 | ||
18 | _t_str_bytes = _t.Union[str, bytes] | |
19 | _t_opt_str_bytes = _t.Optional[_t_str_bytes] | |
20 | _t_opt_int = _t.Optional[int] | |
21 | ||
22 | if _t.TYPE_CHECKING: | |
23 | import typing_extensions as _te | |
24 | ||
25 | ||
26 | class TimestampSigner(Signer): | |
27 | """Works like the regular :class:`.Signer` but also records the time | |
28 | of the signing and can be used to expire signatures. The | |
29 | :meth:`unsign` method can raise :exc:`.SignatureExpired` if the | |
30 | unsigning failed because the signature is expired. | |
31 | """ | |
32 | ||
33 | def get_timestamp(self) -> int: | |
34 | """Returns the current timestamp. The function must return an | |
35 | integer. | |
36 | """ | |
37 | return int(time.time()) | |
38 | ||
39 | def timestamp_to_datetime(self, ts: int) -> datetime: | |
40 | """Convert the timestamp from :meth:`get_timestamp` into an | |
41 | aware :class`datetime.datetime` in UTC. | |
42 | ||
43 | .. versionchanged:: 2.0 | |
44 | The timestamp is returned as a timezone-aware ``datetime`` | |
45 | in UTC rather than a naive ``datetime`` assumed to be UTC. | |
46 | """ | |
47 | return datetime.fromtimestamp(ts, tz=timezone.utc) | |
48 | ||
49 | def sign(self, value: _t_str_bytes) -> bytes: | |
50 | """Signs the given string and also attaches time information.""" | |
51 | value = want_bytes(value) | |
52 | timestamp = base64_encode(int_to_bytes(self.get_timestamp())) | |
53 | sep = want_bytes(self.sep) | |
54 | value = value + sep + timestamp | |
55 | return value + sep + self.get_signature(value) | |
56 | ||
57 | # Ignore overlapping signatures check, return_timestamp is the only | |
58 | # parameter that affects the return type. | |
59 | ||
60 | @typing.overload | |
61 | def unsign( # type: ignore | |
62 | self, | |
63 | signed_value: _t_str_bytes, | |
64 | max_age: _t_opt_int = None, | |
65 | return_timestamp: "_te.Literal[False]" = False, | |
66 | ) -> bytes: | |
67 | ... | |
68 | ||
69 | @typing.overload | |
70 | def unsign( | |
71 | self, | |
72 | signed_value: _t_str_bytes, | |
73 | max_age: _t_opt_int = None, | |
74 | return_timestamp: "_te.Literal[True]" = True, | |
75 | ) -> _t.Tuple[bytes, datetime]: | |
76 | ... | |
77 | ||
78 | def unsign( | |
79 | self, | |
80 | signed_value: _t_str_bytes, | |
81 | max_age: _t_opt_int = None, | |
82 | return_timestamp: bool = False, | |
83 | ) -> _t.Union[_t.Tuple[bytes, datetime], bytes]: | |
84 | """Works like the regular :meth:`.Signer.unsign` but can also | |
85 | validate the time. See the base docstring of the class for | |
86 | the general behavior. If ``return_timestamp`` is ``True`` the | |
87 | timestamp of the signature will be returned as an aware | |
88 | :class:`datetime.datetime` object in UTC. | |
89 | ||
90 | .. versionchanged:: 2.0 | |
91 | The timestamp is returned as a timezone-aware ``datetime`` | |
92 | in UTC rather than a naive ``datetime`` assumed to be UTC. | |
93 | """ | |
94 | try: | |
95 | result = super().unsign(signed_value) | |
96 | sig_error = None | |
97 | except BadSignature as e: | |
98 | sig_error = e | |
99 | result = e.payload or b"" | |
100 | ||
101 | sep = want_bytes(self.sep) | |
102 | ||
103 | # If there is no timestamp in the result there is something | |
104 | # seriously wrong. In case there was a signature error, we raise | |
105 | # that one directly, otherwise we have a weird situation in | |
106 | # which we shouldn't have come except someone uses a time-based | |
107 | # serializer on non-timestamp data, so catch that. | |
108 | if sep not in result: | |
109 | if sig_error: | |
110 | raise sig_error | |
111 | ||
112 | raise BadTimeSignature("timestamp missing", payload=result) | |
113 | ||
114 | value, ts_bytes = result.rsplit(sep, 1) | |
115 | ts_int: _t_opt_int = None | |
116 | ts_dt: _t.Optional[datetime] = None | |
117 | ||
118 | try: | |
119 | ts_int = bytes_to_int(base64_decode(ts_bytes)) | |
120 | except Exception: | |
121 | pass | |
122 | ||
123 | # Signature is *not* okay. Raise a proper error now that we have | |
124 | # split the value and the timestamp. | |
125 | if sig_error is not None: | |
126 | if ts_int is not None: | |
127 | try: | |
128 | ts_dt = self.timestamp_to_datetime(ts_int) | |
129 | except (ValueError, OSError, OverflowError) as exc: | |
130 | # Windows raises OSError | |
131 | # 32-bit raises OverflowError | |
132 | raise BadTimeSignature( | |
133 | "Malformed timestamp", payload=value | |
134 | ) from exc | |
135 | ||
136 | raise BadTimeSignature(str(sig_error), payload=value, date_signed=ts_dt) | |
137 | ||
138 | # Signature was okay but the timestamp is actually not there or | |
139 | # malformed. Should not happen, but we handle it anyway. | |
140 | if ts_int is None: | |
141 | raise BadTimeSignature("Malformed timestamp", payload=value) | |
142 | ||
143 | # Check timestamp is not older than max_age | |
144 | if max_age is not None: | |
145 | age = self.get_timestamp() - ts_int | |
146 | ||
147 | if age > max_age: | |
148 | raise SignatureExpired( | |
149 | f"Signature age {age} > {max_age} seconds", | |
150 | payload=value, | |
151 | date_signed=self.timestamp_to_datetime(ts_int), | |
152 | ) | |
153 | ||
154 | if age < 0: | |
155 | raise SignatureExpired( | |
156 | f"Signature age {age} < 0 seconds", | |
157 | payload=value, | |
158 | date_signed=self.timestamp_to_datetime(ts_int), | |
159 | ) | |
160 | ||
161 | if return_timestamp: | |
162 | return value, self.timestamp_to_datetime(ts_int) | |
163 | ||
164 | return value | |
165 | ||
166 | def validate(self, signed_value: _t_str_bytes, max_age: _t_opt_int = None) -> bool: | |
167 | """Only validates the given signed value. Returns ``True`` if | |
168 | the signature exists and is valid.""" | |
169 | try: | |
170 | self.unsign(signed_value, max_age=max_age) | |
171 | return True | |
172 | except BadSignature: | |
173 | return False | |
174 | ||
175 | ||
176 | class TimedSerializer(Serializer): | |
177 | """Uses :class:`TimestampSigner` instead of the default | |
178 | :class:`.Signer`. | |
179 | """ | |
180 | ||
181 | default_signer: _t.Type[TimestampSigner] = TimestampSigner | |
182 | ||
183 | def iter_unsigners( | |
184 | self, salt: _t_opt_str_bytes = None | |
185 | ) -> _t.Iterator[TimestampSigner]: | |
186 | return _t.cast("_t.Iterator[TimestampSigner]", super().iter_unsigners(salt)) | |
187 | ||
188 | # TODO: Signature is incompatible because parameters were added | |
189 | # before salt. | |
190 | ||
191 | def loads( # type: ignore | |
192 | self, | |
193 | s: _t_str_bytes, | |
194 | max_age: _t_opt_int = None, | |
195 | return_timestamp: bool = False, | |
196 | salt: _t_opt_str_bytes = None, | |
197 | ) -> _t.Any: | |
198 | """Reverse of :meth:`dumps`, raises :exc:`.BadSignature` if the | |
199 | signature validation fails. If a ``max_age`` is provided it will | |
200 | ensure the signature is not older than that time in seconds. In | |
201 | case the signature is outdated, :exc:`.SignatureExpired` is | |
202 | raised. All arguments are forwarded to the signer's | |
203 | :meth:`~TimestampSigner.unsign` method. | |
204 | """ | |
205 | s = want_bytes(s) | |
206 | last_exception = None | |
207 | ||
208 | for signer in self.iter_unsigners(salt): | |
209 | try: | |
210 | base64d, timestamp = signer.unsign( | |
211 | s, max_age=max_age, return_timestamp=True | |
212 | ) | |
213 | payload = self.load_payload(base64d) | |
214 | ||
215 | if return_timestamp: | |
216 | return payload, timestamp | |
217 | ||
218 | return payload | |
219 | except SignatureExpired: | |
220 | # The signature was unsigned successfully but was | |
221 | # expired. Do not try the next signer. | |
222 | raise | |
223 | except BadSignature as err: | |
224 | last_exception = err | |
225 | ||
226 | raise _t.cast(BadSignature, last_exception) | |
227 | ||
228 | def loads_unsafe( # type: ignore | |
229 | self, | |
230 | s: _t_str_bytes, | |
231 | max_age: _t_opt_int = None, | |
232 | salt: _t_opt_str_bytes = None, | |
233 | ) -> _t.Tuple[bool, _t.Any]: | |
234 | return self._loads_unsafe_impl(s, salt, load_kwargs={"max_age": max_age}) |