]>
jfr.im git - dlqueue.git/blob - venv/lib/python3.11/site-packages/werkzeug/sansio/multipart.py
1 from __future__
import annotations
5 from dataclasses
import dataclass
9 from ..datastructures
import Headers
10 from ..exceptions
import RequestEntityTooLarge
11 from ..http
import parse_options_header
18 @dataclass(frozen
=True)
19 class Preamble(Event
):
23 @dataclass(frozen
=True)
29 @dataclass(frozen
=True)
36 @dataclass(frozen
=True)
42 @dataclass(frozen
=True)
43 class Epilogue(Event
):
47 class NeedData(Event
):
51 NEED_DATA
= NeedData()
63 # Multipart line breaks MUST be CRLF (\r\n) by RFC-7578, except that
64 # many implementations break this and either use CR or LF alone.
65 LINE_BREAK
= b
"(?:\r\n|\n|\r)"
66 BLANK_LINE_RE
= re
.compile(b
"(?:\r\n\r\n|\r\r|\n\n)", re
.MULTILINE
)
67 LINE_BREAK_RE
= re
.compile(LINE_BREAK
, re
.MULTILINE
)
68 # Header values can be continued via a space or tab after the linebreak, as
70 HEADER_CONTINUATION_RE
= re
.compile(b
"%s[ \t]" % LINE_BREAK
, re
.MULTILINE
)
71 # This must be long enough to contain any line breaks plus any
72 # additional boundary markers (--) such that they will be found in a
74 SEARCH_EXTRA_LENGTH
= 8
77 class MultipartDecoder
:
78 """Decodes a multipart message as bytes into Python events.
80 The part data is returned as available to allow the caller to save
81 the data from memory to disk, if desired.
87 max_form_memory_size
: int |
None = None,
89 max_parts
: int |
None = None,
91 self
.buffer = bytearray()
93 self
.max_form_memory_size
= max_form_memory_size
94 self
.max_parts
= max_parts
95 self
.state
= State
.PREAMBLE
96 self
.boundary
= boundary
98 # Note in the below \h i.e. horizontal whitespace is used
99 # as [^\S\n\r] as \h isn't supported in python.
101 # The preamble must end with a boundary where the boundary is
102 # prefixed by a line break, RFC2046. Except that many
103 # implementations including Werkzeug's tests omit the line
104 # break prefix. In addition the first boundary could be the
105 # epilogue boundary (for empty form-data) hence the matching
106 # group to understand if it is an epilogue boundary.
107 self
.preamble_re
= re
.compile(
108 rb
"%s?--%s(--[^\S\n\r]*%s?|[^\S\n\r]*%s)"
109 % (LINE_BREAK
, re
.escape(boundary
), LINE_BREAK
, LINE_BREAK
),
112 # A boundary must include a line break prefix and suffix, and
113 # may include trailing whitespace. In addition the boundary
114 # could be the epilogue boundary hence the matching group to
115 # understand if it is an epilogue boundary.
116 self
.boundary_re
= re
.compile(
117 rb
"%s--%s(--[^\S\n\r]*%s?|[^\S\n\r]*%s)"
118 % (LINE_BREAK
, re
.escape(boundary
), LINE_BREAK
, LINE_BREAK
),
121 self
._search
_position
= 0
122 self
._parts
_decoded
= 0
124 def last_newline(self
, data
: bytes) -> int:
126 last_nl
= data
.rindex(b
"\n")
130 last_cr
= data
.rindex(b
"\r")
134 return min(last_nl
, last_cr
)
136 def receive_data(self
, data
: bytes |
None) -> None:
140 self
.max_form_memory_size
is not None
141 and len(self
.buffer) + len(data
) > self
.max_form_memory_size
143 raise RequestEntityTooLarge()
145 self
.buffer.extend(data
)
147 def next_event(self
) -> Event
:
148 event
: Event
= NEED_DATA
150 if self
.state
== State
.PREAMBLE
:
151 match
= self
.preamble_re
.search(self
.buffer, self
._search
_position
)
152 if match
is not None:
153 if match
.group(1).startswith(b
"--"):
154 self
.state
= State
.EPILOGUE
156 self
.state
= State
.PART
157 data
= bytes(self
.buffer[: match
.start()])
158 del self
.buffer[: match
.end()]
159 event
= Preamble(data
=data
)
160 self
._search
_position
= 0
162 # Update the search start position to be equal to the
163 # current buffer length (already searched) minus a
164 # safe buffer for part of the search target.
165 self
._search
_position
= max(
166 0, len(self
.buffer) - len(self
.boundary
) - SEARCH_EXTRA_LENGTH
169 elif self
.state
== State
.PART
:
170 match
= BLANK_LINE_RE
.search(self
.buffer, self
._search
_position
)
171 if match
is not None:
172 headers
= self
._parse
_headers
(self
.buffer[: match
.start()])
173 # The final header ends with a single CRLF, however a
174 # blank line indicates the start of the
175 # body. Therefore the end is after the first CRLF.
176 headers_end
= (match
.start() + match
.end()) // 2
177 del self
.buffer[:headers_end
]
179 if "content-disposition" not in headers
:
180 raise ValueError("Missing Content-Disposition header")
182 disposition
, extra
= parse_options_header(
183 headers
["content-disposition"]
185 name
= t
.cast(str, extra
.get("name"))
186 filename
= extra
.get("filename")
187 if filename
is not None:
198 self
.state
= State
.DATA_START
199 self
._search
_position
= 0
200 self
._parts
_decoded
+= 1
202 if self
.max_parts
is not None and self
._parts
_decoded
> self
.max_parts
:
203 raise RequestEntityTooLarge()
205 # Update the search start position to be equal to the
206 # current buffer length (already searched) minus a
207 # safe buffer for part of the search target.
208 self
._search
_position
= max(0, len(self
.buffer) - SEARCH_EXTRA_LENGTH
)
210 elif self
.state
== State
.DATA_START
:
211 data
, del_index
, more_data
= self
._parse
_data
(self
.buffer, start
=True)
212 del self
.buffer[:del_index
]
213 event
= Data(data
=data
, more_data
=more_data
)
215 self
.state
= State
.DATA
217 elif self
.state
== State
.DATA
:
218 data
, del_index
, more_data
= self
._parse
_data
(self
.buffer, start
=False)
219 del self
.buffer[:del_index
]
220 if data
or not more_data
:
221 event
= Data(data
=data
, more_data
=more_data
)
223 elif self
.state
== State
.EPILOGUE
and self
.complete
:
224 event
= Epilogue(data
=bytes(self
.buffer))
226 self
.state
= State
.COMPLETE
228 if self
.complete
and isinstance(event
, NeedData
):
229 raise ValueError(f
"Invalid form-data cannot parse beyond {self.state}")
233 def _parse_headers(self
, data
: bytes) -> Headers
:
234 headers
: list[tuple[str, str]] = []
235 # Merge the continued headers into one line
236 data
= HEADER_CONTINUATION_RE
.sub(b
" ", data
)
237 # Now there is one header per line
238 for line
in data
.splitlines():
242 name
, _
, value
= line
.decode().partition(":")
243 headers
.append((name
.strip(), value
.strip()))
244 return Headers(headers
)
246 def _parse_data(self
, data
: bytes, *, start
: bool) -> tuple[bytes, int, bool]:
247 # Body parts must start with CRLF (or CR or LF)
249 match
= LINE_BREAK_RE
.match(data
)
250 data_start
= t
.cast(t
.Match
[bytes], match
).end()
254 if self
.buffer.find(b
"--" + self
.boundary
) == -1:
255 # No complete boundary in the buffer, but there may be
256 # a partial boundary at the end. As the boundary
257 # starts with either a nl or cr find the earliest and
258 # return up to that as data.
259 data_end
= del_index
= self
.last_newline(data
[data_start
:]) + data_start
262 match
= self
.boundary_re
.search(data
)
263 if match
is not None:
264 if match
.group(1).startswith(b
"--"):
265 self
.state
= State
.EPILOGUE
267 self
.state
= State
.PART
268 data_end
= match
.start()
269 del_index
= match
.end()
271 data_end
= del_index
= self
.last_newline(data
[data_start
:]) + data_start
272 more_data
= match
is None
274 return bytes(data
[data_start
:data_end
]), del_index
, more_data
277 class MultipartEncoder
:
278 def __init__(self
, boundary
: bytes) -> None:
279 self
.boundary
= boundary
280 self
.state
= State
.PREAMBLE
282 def send_event(self
, event
: Event
) -> bytes:
283 if isinstance(event
, Preamble
) and self
.state
== State
.PREAMBLE
:
284 self
.state
= State
.PART
286 elif isinstance(event
, (Field
, File
)) and self
.state
in {
291 data
= b
"\r\n--" + self
.boundary
+ b
"\r\n"
292 data
+= b
'Content-Disposition: form-data; name="%s"' % event
.name
.encode()
293 if isinstance(event
, File
):
294 data
+= b
'; filename="%s"' % event
.filename
.encode()
296 for name
, value
in t
.cast(Field
, event
).headers
:
297 if name
.lower() != "content-disposition":
298 data
+= f
"{name}: {value}\r\n".encode()
299 self
.state
= State
.DATA_START
301 elif isinstance(event
, Data
) and self
.state
== State
.DATA_START
:
302 self
.state
= State
.DATA
303 if len(event
.data
) > 0:
304 return b
"\r\n" + event
.data
307 elif isinstance(event
, Data
) and self
.state
== State
.DATA
:
309 elif isinstance(event
, Epilogue
):
310 self
.state
= State
.COMPLETE
311 return b
"\r\n--" + self
.boundary
+ b
"--\r\n" + event
.data
313 raise ValueError(f
"Cannot generate {event} in state: {self.state}")