1 from __future__
import annotations
10 from itertools
import chain
11 from pathlib
import PurePath
13 from ._internal
import _log
15 # The various system prefixes where imports are found. Base values are
16 # different when running in a virtualenv. All reloaders will ignore the
17 # base paths (usually the system installation). The stat reloader won't
18 # scan the virtualenv paths, it will only include modules that are
20 _ignore_always
= tuple({sys.base_prefix, sys.base_exec_prefix}
)
21 prefix
= {*_ignore_always, sys.prefix, sys.exec_prefix}
23 if hasattr(sys
, "real_prefix"):
25 prefix
.add(sys
.real_prefix
)
27 _stat_ignore_scan
= tuple(prefix
)
29 _ignore_common_dirs
= {
40 def _iter_module_paths() -> t
.Iterator
[str]:
41 """Find the filesystem paths associated with imported modules."""
42 # List is in case the value is modified by the app while updating.
43 for module
in list(sys
.modules
.values()):
44 name
= getattr(module
, "__file__", None)
46 if name
is None or name
.startswith(_ignore_always
):
49 while not os
.path
.isfile(name
):
50 # Zip file, find the base file without the module path.
52 name
= os
.path
.dirname(name
)
54 if name
== old
: # skip if it was all directories somehow
60 def _remove_by_pattern(paths
: set[str], exclude_patterns
: set[str]) -> None:
61 for pattern
in exclude_patterns
:
62 paths
.difference_update(fnmatch
.filter(paths
, pattern
))
66 extra_files
: set[str], exclude_patterns
: set[str]
68 """Find paths for the stat reloader to watch. Returns imported
69 module files, Python files under non-system paths. Extra files and
70 Python files under extra directories can also be scanned.
72 System paths have to be excluded for efficiency. Non-system paths,
73 such as a project root or ``sys.path.insert``, should be the paths
74 of interest to the user anyway.
78 for path
in chain(list(sys
.path
), extra_files
):
79 path
= os
.path
.abspath(path
)
81 if os
.path
.isfile(path
):
82 # zip file on sys.path, or extra file
86 parent_has_py
= {os.path.dirname(path): True}
88 for root
, dirs
, files
in os
.walk(path
):
89 # Optimizations: ignore system prefixes, __pycache__ will
90 # have a py or pyc module at the import path, ignore some
91 # common known dirs such as version control and tool caches.
93 root
.startswith(_stat_ignore_scan
)
94 or os
.path
.basename(root
) in _ignore_common_dirs
102 if name
.endswith((".py", ".pyc")):
104 paths
.add(os
.path
.join(root
, name
))
106 # Optimization: stop scanning a directory if neither it nor
107 # its parent contained Python files.
108 if not (has_py
or parent_has_py
[os
.path
.dirname(root
)]):
112 parent_has_py
[root
] = has_py
114 paths
.update(_iter_module_paths())
115 _remove_by_pattern(paths
, exclude_patterns
)
119 def _find_watchdog_paths(
120 extra_files
: set[str], exclude_patterns
: set[str]
121 ) -> t
.Iterable
[str]:
122 """Find paths for the stat reloader to watch. Looks at the same
123 sources as the stat reloader, but watches everything under
124 directories instead of individual files.
128 for name
in chain(list(sys
.path
), extra_files
):
129 name
= os
.path
.abspath(name
)
131 if os
.path
.isfile(name
):
132 name
= os
.path
.dirname(name
)
136 for name
in _iter_module_paths():
137 dirs
.add(os
.path
.dirname(name
))
139 _remove_by_pattern(dirs
, exclude_patterns
)
140 return _find_common_roots(dirs
)
143 def _find_common_roots(paths
: t
.Iterable
[str]) -> t
.Iterable
[str]:
144 root
: dict[str, dict] = {}
146 for chunks
in sorted((PurePath(x
).parts
for x
in paths
), key
=len, reverse
=True):
150 node
= node
.setdefault(chunk
, {})
156 def _walk(node
: t
.Mapping
[str, dict], path
: tuple[str, ...]) -> None:
157 for prefix
, child
in node
.items():
158 _walk(child
, path
+ (prefix
,))
161 rv
.add(os
.path
.join(*path
))
167 def _get_args_for_reloading() -> list[str]:
168 """Determine how the script was executed, and return the args needed
169 to execute it again in a new process.
171 if sys
.version_info
>= (3, 10):
172 # sys.orig_argv, added in Python 3.10, contains the exact args used to invoke
173 # Python. Still replace argv[0] with sys.executable for accuracy.
174 return [sys
.executable
, *sys
.orig_argv
[1:]]
176 rv
= [sys
.executable
]
177 py_script
= sys
.argv
[0]
179 # Need to look at main module to determine how it was executed.
180 __main__
= sys
.modules
["__main__"]
182 # The value of __package__ indicates how Python was called. It may
183 # not exist if a setuptools script is installed as an egg. It may be
184 # set incorrectly for entry points created with pip on Windows.
185 if getattr(__main__
, "__package__", None) is None or (
187 and __main__
.__package
__ == ""
188 and not os
.path
.exists(py_script
)
189 and os
.path
.exists(f
"{py_script}.exe")
191 # Executed a file, like "python app.py".
192 py_script
= os
.path
.abspath(py_script
)
195 # Windows entry points have ".exe" extension and should be
197 if not os
.path
.exists(py_script
) and os
.path
.exists(f
"{py_script}.exe"):
201 os
.path
.splitext(sys
.executable
)[1] == ".exe"
202 and os
.path
.splitext(py_script
)[1] == ".exe"
208 # Executed a module, like "python -m werkzeug.serving".
209 if os
.path
.isfile(py_script
):
210 # Rewritten by Python from "-m script" to "/path/to/script.py".
211 py_module
= t
.cast(str, __main__
.__package
__)
212 name
= os
.path
.splitext(os
.path
.basename(py_script
))[0]
214 if name
!= "__main__":
215 py_module
+= f
".{name}"
217 # Incorrectly rewritten by pydevd debugger from "-m script" to "script".
218 py_module
= py_script
220 rv
.extend(("-m", py_module
.lstrip(".")))
231 extra_files
: t
.Iterable
[str] |
None = None,
232 exclude_patterns
: t
.Iterable
[str] |
None = None,
233 interval
: int |
float = 1,
235 self
.extra_files
: set[str] = {os.path.abspath(x) for x in extra_files or ()}
236 self
.exclude_patterns
: set[str] = set(exclude_patterns
or ())
237 self
.interval
= interval
239 def __enter__(self
) -> ReloaderLoop
:
240 """Do any setup, then run one step of the watch to populate the
241 initial filesystem state.
246 def __exit__(self
, exc_type
, exc_val
, exc_tb
): # type: ignore
247 """Clean up any resources associated with the reloader."""
250 def run(self
) -> None:
251 """Continually run the watch step, sleeping for the configured
252 interval after each step.
256 time
.sleep(self
.interval
)
258 def run_step(self
) -> None:
259 """Run one step for watching the filesystem. Called once to set
260 up initial state, then repeatedly to update it.
264 def restart_with_reloader(self
) -> int:
265 """Spawn a new Python interpreter with the same arguments as the
266 current one, but running the reloader thread.
269 _log("info", f
" * Restarting with {self.name}")
270 args
= _get_args_for_reloading()
271 new_environ
= os
.environ
.copy()
272 new_environ
["WERKZEUG_RUN_MAIN"] = "true"
273 exit_code
= subprocess
.call(args
, env
=new_environ
, close_fds
=False)
278 def trigger_reload(self
, filename
: str) -> None:
279 self
.log_reload(filename
)
282 def log_reload(self
, filename
: str) -> None:
283 filename
= os
.path
.abspath(filename
)
284 _log("info", f
" * Detected change in {filename!r}, reloading")
287 class StatReloaderLoop(ReloaderLoop
):
290 def __enter__(self
) -> ReloaderLoop
:
291 self
.mtimes
: dict[str, float] = {}
292 return super().__enter
__()
294 def run_step(self
) -> None:
295 for name
in _find_stat_paths(self
.extra_files
, self
.exclude_patterns
):
297 mtime
= os
.stat(name
).st_mtime
301 old_time
= self
.mtimes
.get(name
)
304 self
.mtimes
[name
] = mtime
308 self
.trigger_reload(name
)
311 class WatchdogReloaderLoop(ReloaderLoop
):
312 def __init__(self
, *args
: t
.Any
, **kwargs
: t
.Any
) -> None:
313 from watchdog
.observers
import Observer
314 from watchdog
.events
import PatternMatchingEventHandler
315 from watchdog
.events
import EVENT_TYPE_OPENED
316 from watchdog
.events
import FileModifiedEvent
318 super().__init
__(*args
, **kwargs
)
319 trigger_reload
= self
.trigger_reload
321 class EventHandler(PatternMatchingEventHandler
):
322 def on_any_event(self
, event
: FileModifiedEvent
): # type: ignore
323 if event
.event_type
== EVENT_TYPE_OPENED
:
326 trigger_reload(event
.src_path
)
328 reloader_name
= Observer
.__name
__.lower() # type: ignore[attr-defined]
330 if reloader_name
.endswith("observer"):
331 reloader_name
= reloader_name
[:-8]
333 self
.name
= f
"watchdog ({reloader_name})"
334 self
.observer
= Observer()
335 # Extra patterns can be non-Python files, match them in addition
336 # to all Python files in default and extra directories. Ignore
337 # __pycache__ since a change there will always have a change to
338 # the source file (or initial pyc file) as well. Ignore Git and
339 # Mercurial internal changes.
340 extra_patterns
= [p
for p
in self
.extra_files
if not os
.path
.isdir(p
)]
341 self
.event_handler
= EventHandler(
342 patterns
=["*.py", "*.pyc", "*.zip", *extra_patterns
],
344 *[f
"*/{d}/*" for d
in _ignore_common_dirs
],
345 *self
.exclude_patterns
,
348 self
.should_reload
= False
350 def trigger_reload(self
, filename
: str) -> None:
351 # This is called inside an event handler, which means throwing
352 # SystemExit has no effect.
353 # https://github.com/gorakhargosh/watchdog/issues/294
354 self
.should_reload
= True
355 self
.log_reload(filename
)
357 def __enter__(self
) -> ReloaderLoop
:
358 self
.watches
: dict[str, t
.Any
] = {}
359 self
.observer
.start()
360 return super().__enter
__()
362 def __exit__(self
, exc_type
, exc_val
, exc_tb
): # type: ignore
366 def run(self
) -> None:
367 while not self
.should_reload
:
369 time
.sleep(self
.interval
)
373 def run_step(self
) -> None:
374 to_delete
= set(self
.watches
)
376 for path
in _find_watchdog_paths(self
.extra_files
, self
.exclude_patterns
):
377 if path
not in self
.watches
:
379 self
.watches
[path
] = self
.observer
.schedule(
380 self
.event_handler
, path
, recursive
=True
383 # Clear this path from list of watches We don't want
384 # the same error message showing again in the next
386 self
.watches
[path
] = None
388 to_delete
.discard(path
)
390 for path
in to_delete
:
391 watch
= self
.watches
.pop(path
, None)
393 if watch
is not None:
394 self
.observer
.unschedule(watch
)
397 reloader_loops
: dict[str, type[ReloaderLoop
]] = {
398 "stat": StatReloaderLoop
,
399 "watchdog": WatchdogReloaderLoop
,
403 __import__("watchdog.observers")
405 reloader_loops
["auto"] = reloader_loops
["stat"]
407 reloader_loops
["auto"] = reloader_loops
["watchdog"]
410 def ensure_echo_on() -> None:
411 """Ensure that echo mode is enabled. Some tools such as PDB disable
412 it which causes usability issues after a reload."""
413 # tcgetattr will fail if stdin isn't a tty
414 if sys
.stdin
is None or not sys
.stdin
.isatty():
422 attributes
= termios
.tcgetattr(sys
.stdin
)
424 if not attributes
[3] & termios
.ECHO
:
425 attributes
[3] |
= termios
.ECHO
426 termios
.tcsetattr(sys
.stdin
, termios
.TCSANOW
, attributes
)
429 def run_with_reloader(
430 main_func
: t
.Callable
[[], None],
431 extra_files
: t
.Iterable
[str] |
None = None,
432 exclude_patterns
: t
.Iterable
[str] |
None = None,
433 interval
: int |
float = 1,
434 reloader_type
: str = "auto",
436 """Run the given function in an independent Python interpreter."""
439 signal
.signal(signal
.SIGTERM
, lambda *args
: sys
.exit(0))
440 reloader
= reloader_loops
[reloader_type
](
441 extra_files
=extra_files
, exclude_patterns
=exclude_patterns
, interval
=interval
445 if os
.environ
.get("WERKZEUG_RUN_MAIN") == "true":
447 t
= threading
.Thread(target
=main_func
, args
=())
450 # Enter the reloader to set up initial state, then start
451 # the app thread and reloader update loop.
456 sys
.exit(reloader
.restart_with_reloader())
457 except KeyboardInterrupt: