]> jfr.im git - dlqueue.git/blob - venv/lib/python3.11/site-packages/werkzeug/debug/__init__.py
init: venv aand flask
[dlqueue.git] / venv / lib / python3.11 / site-packages / werkzeug / debug / __init__.py
1 from __future__ import annotations
2
3 import getpass
4 import hashlib
5 import json
6 import os
7 import pkgutil
8 import re
9 import sys
10 import time
11 import typing as t
12 import uuid
13 from contextlib import ExitStack
14 from io import BytesIO
15 from itertools import chain
16 from os.path import basename
17 from os.path import join
18 from zlib import adler32
19
20 from .._internal import _log
21 from ..exceptions import NotFound
22 from ..http import parse_cookie
23 from ..security import gen_salt
24 from ..utils import send_file
25 from ..wrappers.request import Request
26 from ..wrappers.response import Response
27 from .console import Console
28 from .tbtools import DebugFrameSummary
29 from .tbtools import DebugTraceback
30 from .tbtools import render_console_html
31
32 if t.TYPE_CHECKING:
33 from _typeshed.wsgi import StartResponse
34 from _typeshed.wsgi import WSGIApplication
35 from _typeshed.wsgi import WSGIEnvironment
36
37 # A week
38 PIN_TIME = 60 * 60 * 24 * 7
39
40
41 def hash_pin(pin: str) -> str:
42 return hashlib.sha1(f"{pin} added salt".encode("utf-8", "replace")).hexdigest()[:12]
43
44
45 _machine_id: str | bytes | None = None
46
47
48 def get_machine_id() -> str | bytes | None:
49 global _machine_id
50
51 if _machine_id is not None:
52 return _machine_id
53
54 def _generate() -> str | bytes | None:
55 linux = b""
56
57 # machine-id is stable across boots, boot_id is not.
58 for filename in "/etc/machine-id", "/proc/sys/kernel/random/boot_id":
59 try:
60 with open(filename, "rb") as f:
61 value = f.readline().strip()
62 except OSError:
63 continue
64
65 if value:
66 linux += value
67 break
68
69 # Containers share the same machine id, add some cgroup
70 # information. This is used outside containers too but should be
71 # relatively stable across boots.
72 try:
73 with open("/proc/self/cgroup", "rb") as f:
74 linux += f.readline().strip().rpartition(b"/")[2]
75 except OSError:
76 pass
77
78 if linux:
79 return linux
80
81 # On OS X, use ioreg to get the computer's serial number.
82 try:
83 # subprocess may not be available, e.g. Google App Engine
84 # https://github.com/pallets/werkzeug/issues/925
85 from subprocess import Popen, PIPE
86
87 dump = Popen(
88 ["ioreg", "-c", "IOPlatformExpertDevice", "-d", "2"], stdout=PIPE
89 ).communicate()[0]
90 match = re.search(b'"serial-number" = <([^>]+)', dump)
91
92 if match is not None:
93 return match.group(1)
94 except (OSError, ImportError):
95 pass
96
97 # On Windows, use winreg to get the machine guid.
98 if sys.platform == "win32":
99 import winreg
100
101 try:
102 with winreg.OpenKey(
103 winreg.HKEY_LOCAL_MACHINE,
104 "SOFTWARE\\Microsoft\\Cryptography",
105 0,
106 winreg.KEY_READ | winreg.KEY_WOW64_64KEY,
107 ) as rk:
108 guid: str | bytes
109 guid_type: int
110 guid, guid_type = winreg.QueryValueEx(rk, "MachineGuid")
111
112 if guid_type == winreg.REG_SZ:
113 return guid.encode("utf-8")
114
115 return guid
116 except OSError:
117 pass
118
119 return None
120
121 _machine_id = _generate()
122 return _machine_id
123
124
125 class _ConsoleFrame:
126 """Helper class so that we can reuse the frame console code for the
127 standalone console.
128 """
129
130 def __init__(self, namespace: dict[str, t.Any]):
131 self.console = Console(namespace)
132 self.id = 0
133
134 def eval(self, code: str) -> t.Any:
135 return self.console.eval(code)
136
137
138 def get_pin_and_cookie_name(
139 app: WSGIApplication,
140 ) -> tuple[str, str] | tuple[None, None]:
141 """Given an application object this returns a semi-stable 9 digit pin
142 code and a random key. The hope is that this is stable between
143 restarts to not make debugging particularly frustrating. If the pin
144 was forcefully disabled this returns `None`.
145
146 Second item in the resulting tuple is the cookie name for remembering.
147 """
148 pin = os.environ.get("WERKZEUG_DEBUG_PIN")
149 rv = None
150 num = None
151
152 # Pin was explicitly disabled
153 if pin == "off":
154 return None, None
155
156 # Pin was provided explicitly
157 if pin is not None and pin.replace("-", "").isdecimal():
158 # If there are separators in the pin, return it directly
159 if "-" in pin:
160 rv = pin
161 else:
162 num = pin
163
164 modname = getattr(app, "__module__", t.cast(object, app).__class__.__module__)
165 username: str | None
166
167 try:
168 # getuser imports the pwd module, which does not exist in Google
169 # App Engine. It may also raise a KeyError if the UID does not
170 # have a username, such as in Docker.
171 username = getpass.getuser()
172 except (ImportError, KeyError):
173 username = None
174
175 mod = sys.modules.get(modname)
176
177 # This information only exists to make the cookie unique on the
178 # computer, not as a security feature.
179 probably_public_bits = [
180 username,
181 modname,
182 getattr(app, "__name__", type(app).__name__),
183 getattr(mod, "__file__", None),
184 ]
185
186 # This information is here to make it harder for an attacker to
187 # guess the cookie name. They are unlikely to be contained anywhere
188 # within the unauthenticated debug page.
189 private_bits = [str(uuid.getnode()), get_machine_id()]
190
191 h = hashlib.sha1()
192 for bit in chain(probably_public_bits, private_bits):
193 if not bit:
194 continue
195 if isinstance(bit, str):
196 bit = bit.encode("utf-8")
197 h.update(bit)
198 h.update(b"cookiesalt")
199
200 cookie_name = f"__wzd{h.hexdigest()[:20]}"
201
202 # If we need to generate a pin we salt it a bit more so that we don't
203 # end up with the same value and generate out 9 digits
204 if num is None:
205 h.update(b"pinsalt")
206 num = f"{int(h.hexdigest(), 16):09d}"[:9]
207
208 # Format the pincode in groups of digits for easier remembering if
209 # we don't have a result yet.
210 if rv is None:
211 for group_size in 5, 4, 3:
212 if len(num) % group_size == 0:
213 rv = "-".join(
214 num[x : x + group_size].rjust(group_size, "0")
215 for x in range(0, len(num), group_size)
216 )
217 break
218 else:
219 rv = num
220
221 return rv, cookie_name
222
223
224 class DebuggedApplication:
225 """Enables debugging support for a given application::
226
227 from werkzeug.debug import DebuggedApplication
228 from myapp import app
229 app = DebuggedApplication(app, evalex=True)
230
231 The ``evalex`` argument allows evaluating expressions in any frame
232 of a traceback. This works by preserving each frame with its local
233 state. Some state, such as context globals, cannot be restored with
234 the frame by default. When ``evalex`` is enabled,
235 ``environ["werkzeug.debug.preserve_context"]`` will be a callable
236 that takes a context manager, and can be called multiple times.
237 Each context manager will be entered before evaluating code in the
238 frame, then exited again, so they can perform setup and cleanup for
239 each call.
240
241 :param app: the WSGI application to run debugged.
242 :param evalex: enable exception evaluation feature (interactive
243 debugging). This requires a non-forking server.
244 :param request_key: The key that points to the request object in this
245 environment. This parameter is ignored in current
246 versions.
247 :param console_path: the URL for a general purpose console.
248 :param console_init_func: the function that is executed before starting
249 the general purpose console. The return value
250 is used as initial namespace.
251 :param show_hidden_frames: by default hidden traceback frames are skipped.
252 You can show them by setting this parameter
253 to `True`.
254 :param pin_security: can be used to disable the pin based security system.
255 :param pin_logging: enables the logging of the pin system.
256
257 .. versionchanged:: 2.2
258 Added the ``werkzeug.debug.preserve_context`` environ key.
259 """
260
261 _pin: str
262 _pin_cookie: str
263
264 def __init__(
265 self,
266 app: WSGIApplication,
267 evalex: bool = False,
268 request_key: str = "werkzeug.request",
269 console_path: str = "/console",
270 console_init_func: t.Callable[[], dict[str, t.Any]] | None = None,
271 show_hidden_frames: bool = False,
272 pin_security: bool = True,
273 pin_logging: bool = True,
274 ) -> None:
275 if not console_init_func:
276 console_init_func = None
277 self.app = app
278 self.evalex = evalex
279 self.frames: dict[int, DebugFrameSummary | _ConsoleFrame] = {}
280 self.frame_contexts: dict[int, list[t.ContextManager[None]]] = {}
281 self.request_key = request_key
282 self.console_path = console_path
283 self.console_init_func = console_init_func
284 self.show_hidden_frames = show_hidden_frames
285 self.secret = gen_salt(20)
286 self._failed_pin_auth = 0
287
288 self.pin_logging = pin_logging
289 if pin_security:
290 # Print out the pin for the debugger on standard out.
291 if os.environ.get("WERKZEUG_RUN_MAIN") == "true" and pin_logging:
292 _log("warning", " * Debugger is active!")
293 if self.pin is None:
294 _log("warning", " * Debugger PIN disabled. DEBUGGER UNSECURED!")
295 else:
296 _log("info", " * Debugger PIN: %s", self.pin)
297 else:
298 self.pin = None
299
300 @property
301 def pin(self) -> str | None:
302 if not hasattr(self, "_pin"):
303 pin_cookie = get_pin_and_cookie_name(self.app)
304 self._pin, self._pin_cookie = pin_cookie # type: ignore
305 return self._pin
306
307 @pin.setter
308 def pin(self, value: str) -> None:
309 self._pin = value
310
311 @property
312 def pin_cookie_name(self) -> str:
313 """The name of the pin cookie."""
314 if not hasattr(self, "_pin_cookie"):
315 pin_cookie = get_pin_and_cookie_name(self.app)
316 self._pin, self._pin_cookie = pin_cookie # type: ignore
317 return self._pin_cookie
318
319 def debug_application(
320 self, environ: WSGIEnvironment, start_response: StartResponse
321 ) -> t.Iterator[bytes]:
322 """Run the application and conserve the traceback frames."""
323 contexts: list[t.ContextManager[t.Any]] = []
324
325 if self.evalex:
326 environ["werkzeug.debug.preserve_context"] = contexts.append
327
328 app_iter = None
329 try:
330 app_iter = self.app(environ, start_response)
331 yield from app_iter
332 if hasattr(app_iter, "close"):
333 app_iter.close()
334 except Exception as e:
335 if hasattr(app_iter, "close"):
336 app_iter.close() # type: ignore
337
338 tb = DebugTraceback(e, skip=1, hide=not self.show_hidden_frames)
339
340 for frame in tb.all_frames:
341 self.frames[id(frame)] = frame
342 self.frame_contexts[id(frame)] = contexts
343
344 is_trusted = bool(self.check_pin_trust(environ))
345 html = tb.render_debugger_html(
346 evalex=self.evalex,
347 secret=self.secret,
348 evalex_trusted=is_trusted,
349 )
350 response = Response(html, status=500, mimetype="text/html")
351
352 try:
353 yield from response(environ, start_response)
354 except Exception:
355 # if we end up here there has been output but an error
356 # occurred. in that situation we can do nothing fancy any
357 # more, better log something into the error log and fall
358 # back gracefully.
359 environ["wsgi.errors"].write(
360 "Debugging middleware caught exception in streamed "
361 "response at a point where response headers were already "
362 "sent.\n"
363 )
364
365 environ["wsgi.errors"].write("".join(tb.render_traceback_text()))
366
367 def execute_command( # type: ignore[return]
368 self,
369 request: Request,
370 command: str,
371 frame: DebugFrameSummary | _ConsoleFrame,
372 ) -> Response:
373 """Execute a command in a console."""
374 contexts = self.frame_contexts.get(id(frame), [])
375
376 with ExitStack() as exit_stack:
377 for cm in contexts:
378 exit_stack.enter_context(cm)
379
380 return Response(frame.eval(command), mimetype="text/html")
381
382 def display_console(self, request: Request) -> Response:
383 """Display a standalone shell."""
384 if 0 not in self.frames:
385 if self.console_init_func is None:
386 ns = {}
387 else:
388 ns = dict(self.console_init_func())
389 ns.setdefault("app", self.app)
390 self.frames[0] = _ConsoleFrame(ns)
391 is_trusted = bool(self.check_pin_trust(request.environ))
392 return Response(
393 render_console_html(secret=self.secret, evalex_trusted=is_trusted),
394 mimetype="text/html",
395 )
396
397 def get_resource(self, request: Request, filename: str) -> Response:
398 """Return a static resource from the shared folder."""
399 path = join("shared", basename(filename))
400
401 try:
402 data = pkgutil.get_data(__package__, path)
403 except OSError:
404 return NotFound() # type: ignore[return-value]
405 else:
406 if data is None:
407 return NotFound() # type: ignore[return-value]
408
409 etag = str(adler32(data) & 0xFFFFFFFF)
410 return send_file(
411 BytesIO(data), request.environ, download_name=filename, etag=etag
412 )
413
414 def check_pin_trust(self, environ: WSGIEnvironment) -> bool | None:
415 """Checks if the request passed the pin test. This returns `True` if the
416 request is trusted on a pin/cookie basis and returns `False` if not.
417 Additionally if the cookie's stored pin hash is wrong it will return
418 `None` so that appropriate action can be taken.
419 """
420 if self.pin is None:
421 return True
422 val = parse_cookie(environ).get(self.pin_cookie_name)
423 if not val or "|" not in val:
424 return False
425 ts_str, pin_hash = val.split("|", 1)
426
427 try:
428 ts = int(ts_str)
429 except ValueError:
430 return False
431
432 if pin_hash != hash_pin(self.pin):
433 return None
434 return (time.time() - PIN_TIME) < ts
435
436 def _fail_pin_auth(self) -> None:
437 time.sleep(5.0 if self._failed_pin_auth > 5 else 0.5)
438 self._failed_pin_auth += 1
439
440 def pin_auth(self, request: Request) -> Response:
441 """Authenticates with the pin."""
442 exhausted = False
443 auth = False
444 trust = self.check_pin_trust(request.environ)
445 pin = t.cast(str, self.pin)
446
447 # If the trust return value is `None` it means that the cookie is
448 # set but the stored pin hash value is bad. This means that the
449 # pin was changed. In this case we count a bad auth and unset the
450 # cookie. This way it becomes harder to guess the cookie name
451 # instead of the pin as we still count up failures.
452 bad_cookie = False
453 if trust is None:
454 self._fail_pin_auth()
455 bad_cookie = True
456
457 # If we're trusted, we're authenticated.
458 elif trust:
459 auth = True
460
461 # If we failed too many times, then we're locked out.
462 elif self._failed_pin_auth > 10:
463 exhausted = True
464
465 # Otherwise go through pin based authentication
466 else:
467 entered_pin = request.args["pin"]
468
469 if entered_pin.strip().replace("-", "") == pin.replace("-", ""):
470 self._failed_pin_auth = 0
471 auth = True
472 else:
473 self._fail_pin_auth()
474
475 rv = Response(
476 json.dumps({"auth": auth, "exhausted": exhausted}),
477 mimetype="application/json",
478 )
479 if auth:
480 rv.set_cookie(
481 self.pin_cookie_name,
482 f"{int(time.time())}|{hash_pin(pin)}",
483 httponly=True,
484 samesite="Strict",
485 secure=request.is_secure,
486 )
487 elif bad_cookie:
488 rv.delete_cookie(self.pin_cookie_name)
489 return rv
490
491 def log_pin_request(self) -> Response:
492 """Log the pin if needed."""
493 if self.pin_logging and self.pin is not None:
494 _log(
495 "info", " * To enable the debugger you need to enter the security pin:"
496 )
497 _log("info", " * Debugger pin code: %s", self.pin)
498 return Response("")
499
500 def __call__(
501 self, environ: WSGIEnvironment, start_response: StartResponse
502 ) -> t.Iterable[bytes]:
503 """Dispatch the requests."""
504 # important: don't ever access a function here that reads the incoming
505 # form data! Otherwise the application won't have access to that data
506 # any more!
507 request = Request(environ)
508 response = self.debug_application
509 if request.args.get("__debugger__") == "yes":
510 cmd = request.args.get("cmd")
511 arg = request.args.get("f")
512 secret = request.args.get("s")
513 frame = self.frames.get(request.args.get("frm", type=int)) # type: ignore
514 if cmd == "resource" and arg:
515 response = self.get_resource(request, arg) # type: ignore
516 elif cmd == "pinauth" and secret == self.secret:
517 response = self.pin_auth(request) # type: ignore
518 elif cmd == "printpin" and secret == self.secret:
519 response = self.log_pin_request() # type: ignore
520 elif (
521 self.evalex
522 and cmd is not None
523 and frame is not None
524 and self.secret == secret
525 and self.check_pin_trust(environ)
526 ):
527 response = self.execute_command(request, cmd, frame) # type: ignore
528 elif (
529 self.evalex
530 and self.console_path is not None
531 and request.path == self.console_path
532 ):
533 response = self.display_console(request) # type: ignore
534 return response(environ, start_response)