1 from __future__
import annotations
13 from contextlib
import ExitStack
14 from io
import BytesIO
15 from itertools
import chain
16 from os
.path
import basename
17 from os
.path
import join
18 from zlib
import adler32
20 from .._internal
import _log
21 from ..exceptions
import NotFound
22 from ..http
import parse_cookie
23 from ..security
import gen_salt
24 from ..utils
import send_file
25 from ..wrappers
.request
import Request
26 from ..wrappers
.response
import Response
27 from .console
import Console
28 from .tbtools
import DebugFrameSummary
29 from .tbtools
import DebugTraceback
30 from .tbtools
import render_console_html
33 from _typeshed
.wsgi
import StartResponse
34 from _typeshed
.wsgi
import WSGIApplication
35 from _typeshed
.wsgi
import WSGIEnvironment
38 PIN_TIME
= 60 * 60 * 24 * 7
41 def hash_pin(pin
: str) -> str:
42 return hashlib
.sha1(f
"{pin} added salt".encode("utf-8", "replace")).hexdigest()[:12]
45 _machine_id
: str |
bytes |
None = None
48 def get_machine_id() -> str |
bytes |
None:
51 if _machine_id
is not None:
54 def _generate() -> str |
bytes |
None:
57 # machine-id is stable across boots, boot_id is not.
58 for filename
in "/etc/machine-id", "/proc/sys/kernel/random/boot_id":
60 with open(filename
, "rb") as f
:
61 value
= f
.readline().strip()
69 # Containers share the same machine id, add some cgroup
70 # information. This is used outside containers too but should be
71 # relatively stable across boots.
73 with open("/proc/self/cgroup", "rb") as f
:
74 linux
+= f
.readline().strip().rpartition(b
"/")[2]
81 # On OS X, use ioreg to get the computer's serial number.
83 # subprocess may not be available, e.g. Google App Engine
84 # https://github.com/pallets/werkzeug/issues/925
85 from subprocess
import Popen
, PIPE
88 ["ioreg", "-c", "IOPlatformExpertDevice", "-d", "2"], stdout
=PIPE
90 match
= re
.search(b
'"serial-number" = <([^>]+)', dump
)
94 except (OSError, ImportError):
97 # On Windows, use winreg to get the machine guid.
98 if sys
.platform
== "win32":
103 winreg
.HKEY_LOCAL_MACHINE
,
104 "SOFTWARE\\Microsoft\\Cryptography",
106 winreg
.KEY_READ | winreg
.KEY_WOW64_64KEY
,
110 guid
, guid_type
= winreg
.QueryValueEx(rk
, "MachineGuid")
112 if guid_type
== winreg
.REG_SZ
:
113 return guid
.encode("utf-8")
121 _machine_id
= _generate()
126 """Helper class so that we can reuse the frame console code for the
130 def __init__(self
, namespace
: dict[str, t
.Any
]):
131 self
.console
= Console(namespace
)
134 def eval(self
, code
: str) -> t
.Any
:
135 return self
.console
.eval(code
)
138 def get_pin_and_cookie_name(
139 app
: WSGIApplication
,
140 ) -> tuple[str, str] |
tuple[None, None]:
141 """Given an application object this returns a semi-stable 9 digit pin
142 code and a random key. The hope is that this is stable between
143 restarts to not make debugging particularly frustrating. If the pin
144 was forcefully disabled this returns `None`.
146 Second item in the resulting tuple is the cookie name for remembering.
148 pin
= os
.environ
.get("WERKZEUG_DEBUG_PIN")
152 # Pin was explicitly disabled
156 # Pin was provided explicitly
157 if pin
is not None and pin
.replace("-", "").isdecimal():
158 # If there are separators in the pin, return it directly
164 modname
= getattr(app
, "__module__", t
.cast(object, app
).__class
__.__module
__)
168 # getuser imports the pwd module, which does not exist in Google
169 # App Engine. It may also raise a KeyError if the UID does not
170 # have a username, such as in Docker.
171 username
= getpass
.getuser()
172 except (ImportError, KeyError):
175 mod
= sys
.modules
.get(modname
)
177 # This information only exists to make the cookie unique on the
178 # computer, not as a security feature.
179 probably_public_bits
= [
182 getattr(app
, "__name__", type(app
).__name
__),
183 getattr(mod
, "__file__", None),
186 # This information is here to make it harder for an attacker to
187 # guess the cookie name. They are unlikely to be contained anywhere
188 # within the unauthenticated debug page.
189 private_bits
= [str(uuid
.getnode()), get_machine_id()]
192 for bit
in chain(probably_public_bits
, private_bits
):
195 if isinstance(bit
, str):
196 bit
= bit
.encode("utf-8")
198 h
.update(b
"cookiesalt")
200 cookie_name
= f
"__wzd{h.hexdigest()[:20]}"
202 # If we need to generate a pin we salt it a bit more so that we don't
203 # end up with the same value and generate out 9 digits
206 num
= f
"{int(h.hexdigest(), 16):09d}"[:9]
208 # Format the pincode in groups of digits for easier remembering if
209 # we don't have a result yet.
211 for group_size
in 5, 4, 3:
212 if len(num
) % group_size
== 0:
214 num
[x
: x
+ group_size
].rjust(group_size
, "0")
215 for x
in range(0, len(num
), group_size
)
221 return rv
, cookie_name
224 class DebuggedApplication
:
225 """Enables debugging support for a given application::
227 from werkzeug.debug import DebuggedApplication
228 from myapp import app
229 app = DebuggedApplication(app, evalex=True)
231 The ``evalex`` argument allows evaluating expressions in any frame
232 of a traceback. This works by preserving each frame with its local
233 state. Some state, such as context globals, cannot be restored with
234 the frame by default. When ``evalex`` is enabled,
235 ``environ["werkzeug.debug.preserve_context"]`` will be a callable
236 that takes a context manager, and can be called multiple times.
237 Each context manager will be entered before evaluating code in the
238 frame, then exited again, so they can perform setup and cleanup for
241 :param app: the WSGI application to run debugged.
242 :param evalex: enable exception evaluation feature (interactive
243 debugging). This requires a non-forking server.
244 :param request_key: The key that points to the request object in this
245 environment. This parameter is ignored in current
247 :param console_path: the URL for a general purpose console.
248 :param console_init_func: the function that is executed before starting
249 the general purpose console. The return value
250 is used as initial namespace.
251 :param show_hidden_frames: by default hidden traceback frames are skipped.
252 You can show them by setting this parameter
254 :param pin_security: can be used to disable the pin based security system.
255 :param pin_logging: enables the logging of the pin system.
257 .. versionchanged:: 2.2
258 Added the ``werkzeug.debug.preserve_context`` environ key.
266 app
: WSGIApplication
,
267 evalex
: bool = False,
268 request_key
: str = "werkzeug.request",
269 console_path
: str = "/console",
270 console_init_func
: t
.Callable
[[], dict[str, t
.Any
]] |
None = None,
271 show_hidden_frames
: bool = False,
272 pin_security
: bool = True,
273 pin_logging
: bool = True,
275 if not console_init_func
:
276 console_init_func
= None
279 self
.frames
: dict[int, DebugFrameSummary | _ConsoleFrame
] = {}
280 self
.frame_contexts
: dict[int, list[t
.ContextManager
[None]]] = {}
281 self
.request_key
= request_key
282 self
.console_path
= console_path
283 self
.console_init_func
= console_init_func
284 self
.show_hidden_frames
= show_hidden_frames
285 self
.secret
= gen_salt(20)
286 self
._failed
_pin
_auth
= 0
288 self
.pin_logging
= pin_logging
290 # Print out the pin for the debugger on standard out.
291 if os
.environ
.get("WERKZEUG_RUN_MAIN") == "true" and pin_logging
:
292 _log("warning", " * Debugger is active!")
294 _log("warning", " * Debugger PIN disabled. DEBUGGER UNSECURED!")
296 _log("info", " * Debugger PIN: %s", self
.pin
)
301 def pin(self
) -> str |
None:
302 if not hasattr(self
, "_pin"):
303 pin_cookie
= get_pin_and_cookie_name(self
.app
)
304 self
._pin
, self
._pin
_cookie
= pin_cookie
# type: ignore
308 def pin(self
, value
: str) -> None:
312 def pin_cookie_name(self
) -> str:
313 """The name of the pin cookie."""
314 if not hasattr(self
, "_pin_cookie"):
315 pin_cookie
= get_pin_and_cookie_name(self
.app
)
316 self
._pin
, self
._pin
_cookie
= pin_cookie
# type: ignore
317 return self
._pin
_cookie
319 def debug_application(
320 self
, environ
: WSGIEnvironment
, start_response
: StartResponse
321 ) -> t
.Iterator
[bytes]:
322 """Run the application and conserve the traceback frames."""
323 contexts
: list[t
.ContextManager
[t
.Any
]] = []
326 environ
["werkzeug.debug.preserve_context"] = contexts
.append
330 app_iter
= self
.app(environ
, start_response
)
332 if hasattr(app_iter
, "close"):
334 except Exception as e
:
335 if hasattr(app_iter
, "close"):
336 app_iter
.close() # type: ignore
338 tb
= DebugTraceback(e
, skip
=1, hide
=not self
.show_hidden_frames
)
340 for frame
in tb
.all_frames
:
341 self
.frames
[id(frame
)] = frame
342 self
.frame_contexts
[id(frame
)] = contexts
344 is_trusted
= bool(self
.check_pin_trust(environ
))
345 html
= tb
.render_debugger_html(
348 evalex_trusted
=is_trusted
,
350 response
= Response(html
, status
=500, mimetype
="text/html")
353 yield from response(environ
, start_response
)
355 # if we end up here there has been output but an error
356 # occurred. in that situation we can do nothing fancy any
357 # more, better log something into the error log and fall
359 environ
["wsgi.errors"].write(
360 "Debugging middleware caught exception in streamed "
361 "response at a point where response headers were already "
365 environ
["wsgi.errors"].write("".join(tb
.render_traceback_text()))
367 def execute_command( # type: ignore[return]
371 frame
: DebugFrameSummary | _ConsoleFrame
,
373 """Execute a command in a console."""
374 contexts
= self
.frame_contexts
.get(id(frame
), [])
376 with ExitStack() as exit_stack
:
378 exit_stack
.enter_context(cm
)
380 return Response(frame
.eval(command
), mimetype
="text/html")
382 def display_console(self
, request
: Request
) -> Response
:
383 """Display a standalone shell."""
384 if 0 not in self
.frames
:
385 if self
.console_init_func
is None:
388 ns
= dict(self
.console_init_func())
389 ns
.setdefault("app", self
.app
)
390 self
.frames
[0] = _ConsoleFrame(ns
)
391 is_trusted
= bool(self
.check_pin_trust(request
.environ
))
393 render_console_html(secret
=self
.secret
, evalex_trusted
=is_trusted
),
394 mimetype
="text/html",
397 def get_resource(self
, request
: Request
, filename
: str) -> Response
:
398 """Return a static resource from the shared folder."""
399 path
= join("shared", basename(filename
))
402 data
= pkgutil
.get_data(__package__
, path
)
404 return NotFound() # type: ignore[return-value]
407 return NotFound() # type: ignore[return-value]
409 etag
= str(adler32(data
) & 0xFFFFFFFF)
411 BytesIO(data
), request
.environ
, download_name
=filename
, etag
=etag
414 def check_pin_trust(self
, environ
: WSGIEnvironment
) -> bool |
None:
415 """Checks if the request passed the pin test. This returns `True` if the
416 request is trusted on a pin/cookie basis and returns `False` if not.
417 Additionally if the cookie's stored pin hash is wrong it will return
418 `None` so that appropriate action can be taken.
422 val
= parse_cookie(environ
).get(self
.pin_cookie_name
)
423 if not val
or "|" not in val
:
425 ts_str
, pin_hash
= val
.split("|", 1)
432 if pin_hash
!= hash_pin(self
.pin
):
434 return (time
.time() - PIN_TIME
) < ts
436 def _fail_pin_auth(self
) -> None:
437 time
.sleep(5.0 if self
._failed
_pin
_auth
> 5 else 0.5)
438 self
._failed
_pin
_auth
+= 1
440 def pin_auth(self
, request
: Request
) -> Response
:
441 """Authenticates with the pin."""
444 trust
= self
.check_pin_trust(request
.environ
)
445 pin
= t
.cast(str, self
.pin
)
447 # If the trust return value is `None` it means that the cookie is
448 # set but the stored pin hash value is bad. This means that the
449 # pin was changed. In this case we count a bad auth and unset the
450 # cookie. This way it becomes harder to guess the cookie name
451 # instead of the pin as we still count up failures.
454 self
._fail
_pin
_auth
()
457 # If we're trusted, we're authenticated.
461 # If we failed too many times, then we're locked out.
462 elif self
._failed
_pin
_auth
> 10:
465 # Otherwise go through pin based authentication
467 entered_pin
= request
.args
["pin"]
469 if entered_pin
.strip().replace("-", "") == pin
.replace("-", ""):
470 self
._failed
_pin
_auth
= 0
473 self
._fail
_pin
_auth
()
476 json
.dumps({"auth": auth, "exhausted": exhausted}
),
477 mimetype
="application/json",
481 self
.pin_cookie_name
,
482 f
"{int(time.time())}|{hash_pin(pin)}",
485 secure
=request
.is_secure
,
488 rv
.delete_cookie(self
.pin_cookie_name
)
491 def log_pin_request(self
) -> Response
:
492 """Log the pin if needed."""
493 if self
.pin_logging
and self
.pin
is not None:
495 "info", " * To enable the debugger you need to enter the security pin:"
497 _log("info", " * Debugger pin code: %s", self
.pin
)
501 self
, environ
: WSGIEnvironment
, start_response
: StartResponse
502 ) -> t
.Iterable
[bytes]:
503 """Dispatch the requests."""
504 # important: don't ever access a function here that reads the incoming
505 # form data! Otherwise the application won't have access to that data
507 request
= Request(environ
)
508 response
= self
.debug_application
509 if request
.args
.get("__debugger__") == "yes":
510 cmd
= request
.args
.get("cmd")
511 arg
= request
.args
.get("f")
512 secret
= request
.args
.get("s")
513 frame
= self
.frames
.get(request
.args
.get("frm", type=int)) # type: ignore
514 if cmd
== "resource" and arg
:
515 response
= self
.get_resource(request
, arg
) # type: ignore
516 elif cmd
== "pinauth" and secret
== self
.secret
:
517 response
= self
.pin_auth(request
) # type: ignore
518 elif cmd
== "printpin" and secret
== self
.secret
:
519 response
= self
.log_pin_request() # type: ignore
523 and frame
is not None
524 and self
.secret
== secret
525 and self
.check_pin_trust(environ
)
527 response
= self
.execute_command(request
, cmd
, frame
) # type: ignore
530 and self
.console_path
is not None
531 and request
.path
== self
.console_path
533 response
= self
.display_console(request
) # type: ignore
534 return response(environ
, start_response
)