]>
Commit | Line | Data |
---|---|---|
e0df8241 JR |
1 | """ |
2 | Tagged JSON | |
3 | ~~~~~~~~~~~ | |
4 | ||
5 | A compact representation for lossless serialization of non-standard JSON | |
6 | types. :class:`~flask.sessions.SecureCookieSessionInterface` uses this | |
7 | to serialize the session data, but it may be useful in other places. It | |
8 | can be extended to support other types. | |
9 | ||
10 | .. autoclass:: TaggedJSONSerializer | |
11 | :members: | |
12 | ||
13 | .. autoclass:: JSONTag | |
14 | :members: | |
15 | ||
16 | Let's see an example that adds support for | |
17 | :class:`~collections.OrderedDict`. Dicts don't have an order in JSON, so | |
18 | to handle this we will dump the items as a list of ``[key, value]`` | |
19 | pairs. Subclass :class:`JSONTag` and give it the new key ``' od'`` to | |
20 | identify the type. The session serializer processes dicts first, so | |
21 | insert the new tag at the front of the order since ``OrderedDict`` must | |
22 | be processed before ``dict``. | |
23 | ||
24 | .. code-block:: python | |
25 | ||
26 | from flask.json.tag import JSONTag | |
27 | ||
28 | class TagOrderedDict(JSONTag): | |
29 | __slots__ = ('serializer',) | |
30 | key = ' od' | |
31 | ||
32 | def check(self, value): | |
33 | return isinstance(value, OrderedDict) | |
34 | ||
35 | def to_json(self, value): | |
36 | return [[k, self.serializer.tag(v)] for k, v in iteritems(value)] | |
37 | ||
38 | def to_python(self, value): | |
39 | return OrderedDict(value) | |
40 | ||
41 | app.session_interface.serializer.register(TagOrderedDict, index=0) | |
42 | """ | |
43 | from __future__ import annotations | |
44 | ||
45 | import typing as t | |
46 | from base64 import b64decode | |
47 | from base64 import b64encode | |
48 | from datetime import datetime | |
49 | from uuid import UUID | |
50 | ||
51 | from markupsafe import Markup | |
52 | from werkzeug.http import http_date | |
53 | from werkzeug.http import parse_date | |
54 | ||
55 | from ..json import dumps | |
56 | from ..json import loads | |
57 | ||
58 | ||
59 | class JSONTag: | |
60 | """Base class for defining type tags for :class:`TaggedJSONSerializer`.""" | |
61 | ||
62 | __slots__ = ("serializer",) | |
63 | ||
64 | #: The tag to mark the serialized object with. If ``None``, this tag is | |
65 | #: only used as an intermediate step during tagging. | |
66 | key: str | None = None | |
67 | ||
68 | def __init__(self, serializer: TaggedJSONSerializer) -> None: | |
69 | """Create a tagger for the given serializer.""" | |
70 | self.serializer = serializer | |
71 | ||
72 | def check(self, value: t.Any) -> bool: | |
73 | """Check if the given value should be tagged by this tag.""" | |
74 | raise NotImplementedError | |
75 | ||
76 | def to_json(self, value: t.Any) -> t.Any: | |
77 | """Convert the Python object to an object that is a valid JSON type. | |
78 | The tag will be added later.""" | |
79 | raise NotImplementedError | |
80 | ||
81 | def to_python(self, value: t.Any) -> t.Any: | |
82 | """Convert the JSON representation back to the correct type. The tag | |
83 | will already be removed.""" | |
84 | raise NotImplementedError | |
85 | ||
86 | def tag(self, value: t.Any) -> t.Any: | |
87 | """Convert the value to a valid JSON type and add the tag structure | |
88 | around it.""" | |
89 | return {self.key: self.to_json(value)} | |
90 | ||
91 | ||
92 | class TagDict(JSONTag): | |
93 | """Tag for 1-item dicts whose only key matches a registered tag. | |
94 | ||
95 | Internally, the dict key is suffixed with `__`, and the suffix is removed | |
96 | when deserializing. | |
97 | """ | |
98 | ||
99 | __slots__ = () | |
100 | key = " di" | |
101 | ||
102 | def check(self, value: t.Any) -> bool: | |
103 | return ( | |
104 | isinstance(value, dict) | |
105 | and len(value) == 1 | |
106 | and next(iter(value)) in self.serializer.tags | |
107 | ) | |
108 | ||
109 | def to_json(self, value: t.Any) -> t.Any: | |
110 | key = next(iter(value)) | |
111 | return {f"{key}__": self.serializer.tag(value[key])} | |
112 | ||
113 | def to_python(self, value: t.Any) -> t.Any: | |
114 | key = next(iter(value)) | |
115 | return {key[:-2]: value[key]} | |
116 | ||
117 | ||
118 | class PassDict(JSONTag): | |
119 | __slots__ = () | |
120 | ||
121 | def check(self, value: t.Any) -> bool: | |
122 | return isinstance(value, dict) | |
123 | ||
124 | def to_json(self, value: t.Any) -> t.Any: | |
125 | # JSON objects may only have string keys, so don't bother tagging the | |
126 | # key here. | |
127 | return {k: self.serializer.tag(v) for k, v in value.items()} | |
128 | ||
129 | tag = to_json | |
130 | ||
131 | ||
132 | class TagTuple(JSONTag): | |
133 | __slots__ = () | |
134 | key = " t" | |
135 | ||
136 | def check(self, value: t.Any) -> bool: | |
137 | return isinstance(value, tuple) | |
138 | ||
139 | def to_json(self, value: t.Any) -> t.Any: | |
140 | return [self.serializer.tag(item) for item in value] | |
141 | ||
142 | def to_python(self, value: t.Any) -> t.Any: | |
143 | return tuple(value) | |
144 | ||
145 | ||
146 | class PassList(JSONTag): | |
147 | __slots__ = () | |
148 | ||
149 | def check(self, value: t.Any) -> bool: | |
150 | return isinstance(value, list) | |
151 | ||
152 | def to_json(self, value: t.Any) -> t.Any: | |
153 | return [self.serializer.tag(item) for item in value] | |
154 | ||
155 | tag = to_json | |
156 | ||
157 | ||
158 | class TagBytes(JSONTag): | |
159 | __slots__ = () | |
160 | key = " b" | |
161 | ||
162 | def check(self, value: t.Any) -> bool: | |
163 | return isinstance(value, bytes) | |
164 | ||
165 | def to_json(self, value: t.Any) -> t.Any: | |
166 | return b64encode(value).decode("ascii") | |
167 | ||
168 | def to_python(self, value: t.Any) -> t.Any: | |
169 | return b64decode(value) | |
170 | ||
171 | ||
172 | class TagMarkup(JSONTag): | |
173 | """Serialize anything matching the :class:`~markupsafe.Markup` API by | |
174 | having a ``__html__`` method to the result of that method. Always | |
175 | deserializes to an instance of :class:`~markupsafe.Markup`.""" | |
176 | ||
177 | __slots__ = () | |
178 | key = " m" | |
179 | ||
180 | def check(self, value: t.Any) -> bool: | |
181 | return callable(getattr(value, "__html__", None)) | |
182 | ||
183 | def to_json(self, value: t.Any) -> t.Any: | |
184 | return str(value.__html__()) | |
185 | ||
186 | def to_python(self, value: t.Any) -> t.Any: | |
187 | return Markup(value) | |
188 | ||
189 | ||
190 | class TagUUID(JSONTag): | |
191 | __slots__ = () | |
192 | key = " u" | |
193 | ||
194 | def check(self, value: t.Any) -> bool: | |
195 | return isinstance(value, UUID) | |
196 | ||
197 | def to_json(self, value: t.Any) -> t.Any: | |
198 | return value.hex | |
199 | ||
200 | def to_python(self, value: t.Any) -> t.Any: | |
201 | return UUID(value) | |
202 | ||
203 | ||
204 | class TagDateTime(JSONTag): | |
205 | __slots__ = () | |
206 | key = " d" | |
207 | ||
208 | def check(self, value: t.Any) -> bool: | |
209 | return isinstance(value, datetime) | |
210 | ||
211 | def to_json(self, value: t.Any) -> t.Any: | |
212 | return http_date(value) | |
213 | ||
214 | def to_python(self, value: t.Any) -> t.Any: | |
215 | return parse_date(value) | |
216 | ||
217 | ||
218 | class TaggedJSONSerializer: | |
219 | """Serializer that uses a tag system to compactly represent objects that | |
220 | are not JSON types. Passed as the intermediate serializer to | |
221 | :class:`itsdangerous.Serializer`. | |
222 | ||
223 | The following extra types are supported: | |
224 | ||
225 | * :class:`dict` | |
226 | * :class:`tuple` | |
227 | * :class:`bytes` | |
228 | * :class:`~markupsafe.Markup` | |
229 | * :class:`~uuid.UUID` | |
230 | * :class:`~datetime.datetime` | |
231 | """ | |
232 | ||
233 | __slots__ = ("tags", "order") | |
234 | ||
235 | #: Tag classes to bind when creating the serializer. Other tags can be | |
236 | #: added later using :meth:`~register`. | |
237 | default_tags = [ | |
238 | TagDict, | |
239 | PassDict, | |
240 | TagTuple, | |
241 | PassList, | |
242 | TagBytes, | |
243 | TagMarkup, | |
244 | TagUUID, | |
245 | TagDateTime, | |
246 | ] | |
247 | ||
248 | def __init__(self) -> None: | |
249 | self.tags: dict[str, JSONTag] = {} | |
250 | self.order: list[JSONTag] = [] | |
251 | ||
252 | for cls in self.default_tags: | |
253 | self.register(cls) | |
254 | ||
255 | def register( | |
256 | self, | |
257 | tag_class: type[JSONTag], | |
258 | force: bool = False, | |
259 | index: int | None = None, | |
260 | ) -> None: | |
261 | """Register a new tag with this serializer. | |
262 | ||
263 | :param tag_class: tag class to register. Will be instantiated with this | |
264 | serializer instance. | |
265 | :param force: overwrite an existing tag. If false (default), a | |
266 | :exc:`KeyError` is raised. | |
267 | :param index: index to insert the new tag in the tag order. Useful when | |
268 | the new tag is a special case of an existing tag. If ``None`` | |
269 | (default), the tag is appended to the end of the order. | |
270 | ||
271 | :raise KeyError: if the tag key is already registered and ``force`` is | |
272 | not true. | |
273 | """ | |
274 | tag = tag_class(self) | |
275 | key = tag.key | |
276 | ||
277 | if key is not None: | |
278 | if not force and key in self.tags: | |
279 | raise KeyError(f"Tag '{key}' is already registered.") | |
280 | ||
281 | self.tags[key] = tag | |
282 | ||
283 | if index is None: | |
284 | self.order.append(tag) | |
285 | else: | |
286 | self.order.insert(index, tag) | |
287 | ||
288 | def tag(self, value: t.Any) -> dict[str, t.Any]: | |
289 | """Convert a value to a tagged representation if necessary.""" | |
290 | for tag in self.order: | |
291 | if tag.check(value): | |
292 | return tag.tag(value) | |
293 | ||
294 | return value | |
295 | ||
296 | def untag(self, value: dict[str, t.Any]) -> t.Any: | |
297 | """Convert a tagged representation back to the original type.""" | |
298 | if len(value) != 1: | |
299 | return value | |
300 | ||
301 | key = next(iter(value)) | |
302 | ||
303 | if key not in self.tags: | |
304 | return value | |
305 | ||
306 | return self.tags[key].to_python(value[key]) | |
307 | ||
308 | def dumps(self, value: t.Any) -> str: | |
309 | """Tag the value and dump it to a compact JSON string.""" | |
310 | return dumps(self.tag(value), separators=(",", ":")) | |
311 | ||
312 | def loads(self, value: str) -> t.Any: | |
313 | """Load data from a JSON string and deserialized any tagged objects.""" | |
314 | return loads(value, object_hook=self.untag) |