]> jfr.im git - dlqueue.git/blob - venv/lib/python3.11/site-packages/blinker/base.py
init: venv aand flask
[dlqueue.git] / venv / lib / python3.11 / site-packages / blinker / base.py
1 """Signals and events.
2
3 A small implementation of signals, inspired by a snippet of Django signal
4 API client code seen in a blog post. Signals are first-class objects and
5 each manages its own receivers and message emission.
6
7 The :func:`signal` function provides singleton behavior for named signals.
8
9 """
10 from __future__ import annotations
11
12 import typing as t
13 from collections import defaultdict
14 from contextlib import contextmanager
15 from warnings import warn
16 from weakref import WeakValueDictionary
17
18 from blinker._utilities import annotatable_weakref
19 from blinker._utilities import hashable_identity
20 from blinker._utilities import IdentityType
21 from blinker._utilities import is_coroutine_function
22 from blinker._utilities import lazy_property
23 from blinker._utilities import reference
24 from blinker._utilities import symbol
25 from blinker._utilities import WeakTypes
26
27 if t.TYPE_CHECKING:
28 import typing_extensions as te
29
30 T_callable = t.TypeVar("T_callable", bound=t.Callable[..., t.Any])
31
32 T = t.TypeVar("T")
33 P = te.ParamSpec("P")
34
35 AsyncWrapperType = t.Callable[[t.Callable[P, T]], t.Callable[P, t.Awaitable[T]]]
36 SyncWrapperType = t.Callable[[t.Callable[P, t.Awaitable[T]]], t.Callable[P, T]]
37
38 ANY = symbol("ANY")
39 ANY.__doc__ = 'Token for "any sender".'
40 ANY_ID = 0
41
42
43 class Signal:
44 """A notification emitter."""
45
46 #: An :obj:`ANY` convenience synonym, allows ``Signal.ANY``
47 #: without an additional import.
48 ANY = ANY
49
50 @lazy_property
51 def receiver_connected(self) -> Signal:
52 """Emitted after each :meth:`connect`.
53
54 The signal sender is the signal instance, and the :meth:`connect`
55 arguments are passed through: *receiver*, *sender*, and *weak*.
56
57 .. versionadded:: 1.2
58
59 """
60 return Signal(doc="Emitted after a receiver connects.")
61
62 @lazy_property
63 def receiver_disconnected(self) -> Signal:
64 """Emitted after :meth:`disconnect`.
65
66 The sender is the signal instance, and the :meth:`disconnect` arguments
67 are passed through: *receiver* and *sender*.
68
69 Note, this signal is emitted **only** when :meth:`disconnect` is
70 called explicitly.
71
72 The disconnect signal can not be emitted by an automatic disconnect
73 (due to a weakly referenced receiver or sender going out of scope),
74 as the receiver and/or sender instances are no longer available for
75 use at the time this signal would be emitted.
76
77 An alternative approach is available by subscribing to
78 :attr:`receiver_connected` and setting up a custom weakref cleanup
79 callback on weak receivers and senders.
80
81 .. versionadded:: 1.2
82
83 """
84 return Signal(doc="Emitted after a receiver disconnects.")
85
86 def __init__(self, doc: str | None = None) -> None:
87 """
88 :param doc: optional. If provided, will be assigned to the signal's
89 __doc__ attribute.
90
91 """
92 if doc:
93 self.__doc__ = doc
94 #: A mapping of connected receivers.
95 #:
96 #: The values of this mapping are not meaningful outside of the
97 #: internal :class:`Signal` implementation, however the boolean value
98 #: of the mapping is useful as an extremely efficient check to see if
99 #: any receivers are connected to the signal.
100 self.receivers: dict[IdentityType, t.Callable | annotatable_weakref] = {}
101 self.is_muted = False
102 self._by_receiver: dict[IdentityType, set[IdentityType]] = defaultdict(set)
103 self._by_sender: dict[IdentityType, set[IdentityType]] = defaultdict(set)
104 self._weak_senders: dict[IdentityType, annotatable_weakref] = {}
105
106 def connect(
107 self, receiver: T_callable, sender: t.Any = ANY, weak: bool = True
108 ) -> T_callable:
109 """Connect *receiver* to signal events sent by *sender*.
110
111 :param receiver: A callable. Will be invoked by :meth:`send` with
112 `sender=` as a single positional argument and any ``kwargs`` that
113 were provided to a call to :meth:`send`.
114
115 :param sender: Any object or :obj:`ANY`, defaults to ``ANY``.
116 Restricts notifications delivered to *receiver* to only those
117 :meth:`send` emissions sent by *sender*. If ``ANY``, the receiver
118 will always be notified. A *receiver* may be connected to
119 multiple *sender* values on the same Signal through multiple calls
120 to :meth:`connect`.
121
122 :param weak: If true, the Signal will hold a weakref to *receiver*
123 and automatically disconnect when *receiver* goes out of scope or
124 is garbage collected. Defaults to True.
125
126 """
127 receiver_id = hashable_identity(receiver)
128 receiver_ref: T_callable | annotatable_weakref
129
130 if weak:
131 receiver_ref = reference(receiver, self._cleanup_receiver)
132 receiver_ref.receiver_id = receiver_id
133 else:
134 receiver_ref = receiver
135 sender_id: IdentityType
136 if sender is ANY:
137 sender_id = ANY_ID
138 else:
139 sender_id = hashable_identity(sender)
140
141 self.receivers.setdefault(receiver_id, receiver_ref)
142 self._by_sender[sender_id].add(receiver_id)
143 self._by_receiver[receiver_id].add(sender_id)
144 del receiver_ref
145
146 if sender is not ANY and sender_id not in self._weak_senders:
147 # wire together a cleanup for weakref-able senders
148 try:
149 sender_ref = reference(sender, self._cleanup_sender)
150 sender_ref.sender_id = sender_id
151 except TypeError:
152 pass
153 else:
154 self._weak_senders.setdefault(sender_id, sender_ref)
155 del sender_ref
156
157 # broadcast this connection. if receivers raise, disconnect.
158 if "receiver_connected" in self.__dict__ and self.receiver_connected.receivers:
159 try:
160 self.receiver_connected.send(
161 self, receiver=receiver, sender=sender, weak=weak
162 )
163 except TypeError as e:
164 self.disconnect(receiver, sender)
165 raise e
166 if receiver_connected.receivers and self is not receiver_connected:
167 try:
168 receiver_connected.send(
169 self, receiver_arg=receiver, sender_arg=sender, weak_arg=weak
170 )
171 except TypeError as e:
172 self.disconnect(receiver, sender)
173 raise e
174 return receiver
175
176 def connect_via(
177 self, sender: t.Any, weak: bool = False
178 ) -> t.Callable[[T_callable], T_callable]:
179 """Connect the decorated function as a receiver for *sender*.
180
181 :param sender: Any object or :obj:`ANY`. The decorated function
182 will only receive :meth:`send` emissions sent by *sender*. If
183 ``ANY``, the receiver will always be notified. A function may be
184 decorated multiple times with differing *sender* values.
185
186 :param weak: If true, the Signal will hold a weakref to the
187 decorated function and automatically disconnect when *receiver*
188 goes out of scope or is garbage collected. Unlike
189 :meth:`connect`, this defaults to False.
190
191 The decorated function will be invoked by :meth:`send` with
192 `sender=` as a single positional argument and any ``kwargs`` that
193 were provided to the call to :meth:`send`.
194
195
196 .. versionadded:: 1.1
197
198 """
199
200 def decorator(fn: T_callable) -> T_callable:
201 self.connect(fn, sender, weak)
202 return fn
203
204 return decorator
205
206 @contextmanager
207 def connected_to(
208 self, receiver: t.Callable, sender: t.Any = ANY
209 ) -> t.Generator[None, None, None]:
210 """Execute a block with the signal temporarily connected to *receiver*.
211
212 :param receiver: a receiver callable
213 :param sender: optional, a sender to filter on
214
215 This is a context manager for use in the ``with`` statement. It can
216 be useful in unit tests. *receiver* is connected to the signal for
217 the duration of the ``with`` block, and will be disconnected
218 automatically when exiting the block:
219
220 .. code-block:: python
221
222 with on_ready.connected_to(receiver):
223 # do stuff
224 on_ready.send(123)
225
226 .. versionadded:: 1.1
227
228 """
229 self.connect(receiver, sender=sender, weak=False)
230 try:
231 yield None
232 except Exception as e:
233 self.disconnect(receiver)
234 raise e
235 else:
236 self.disconnect(receiver)
237
238 @contextmanager
239 def muted(self) -> t.Generator[None, None, None]:
240 """Context manager for temporarily disabling signal.
241 Useful for test purposes.
242 """
243 self.is_muted = True
244 try:
245 yield None
246 except Exception as e:
247 raise e
248 finally:
249 self.is_muted = False
250
251 def temporarily_connected_to(
252 self, receiver: t.Callable, sender: t.Any = ANY
253 ) -> t.ContextManager[None]:
254 """An alias for :meth:`connected_to`.
255
256 :param receiver: a receiver callable
257 :param sender: optional, a sender to filter on
258
259 .. versionadded:: 0.9
260
261 .. versionchanged:: 1.1
262 Renamed to :meth:`connected_to`. ``temporarily_connected_to`` was
263 deprecated in 1.2 and will be removed in a subsequent version.
264
265 """
266 warn(
267 "temporarily_connected_to is deprecated; use connected_to instead.",
268 DeprecationWarning,
269 )
270 return self.connected_to(receiver, sender)
271
272 def send(
273 self,
274 *sender: t.Any,
275 _async_wrapper: AsyncWrapperType | None = None,
276 **kwargs: t.Any,
277 ) -> list[tuple[t.Callable, t.Any]]:
278 """Emit this signal on behalf of *sender*, passing on ``kwargs``.
279
280 Returns a list of 2-tuples, pairing receivers with their return
281 value. The ordering of receiver notification is undefined.
282
283 :param sender: Any object or ``None``. If omitted, synonymous
284 with ``None``. Only accepts one positional argument.
285 :param _async_wrapper: A callable that should wrap a coroutine
286 receiver and run it when called synchronously.
287
288 :param kwargs: Data to be sent to receivers.
289 """
290 if self.is_muted:
291 return []
292
293 sender = self._extract_sender(sender)
294 results = []
295 for receiver in self.receivers_for(sender):
296 if is_coroutine_function(receiver):
297 if _async_wrapper is None:
298 raise RuntimeError("Cannot send to a coroutine function")
299 receiver = _async_wrapper(receiver)
300 result = receiver(sender, **kwargs) # type: ignore[call-arg]
301 results.append((receiver, result))
302 return results
303
304 async def send_async(
305 self,
306 *sender: t.Any,
307 _sync_wrapper: SyncWrapperType | None = None,
308 **kwargs: t.Any,
309 ) -> list[tuple[t.Callable, t.Any]]:
310 """Emit this signal on behalf of *sender*, passing on ``kwargs``.
311
312 Returns a list of 2-tuples, pairing receivers with their return
313 value. The ordering of receiver notification is undefined.
314
315 :param sender: Any object or ``None``. If omitted, synonymous
316 with ``None``. Only accepts one positional argument.
317 :param _sync_wrapper: A callable that should wrap a synchronous
318 receiver and run it when awaited.
319
320 :param kwargs: Data to be sent to receivers.
321 """
322 if self.is_muted:
323 return []
324
325 sender = self._extract_sender(sender)
326 results = []
327 for receiver in self.receivers_for(sender):
328 if not is_coroutine_function(receiver):
329 if _sync_wrapper is None:
330 raise RuntimeError("Cannot send to a non-coroutine function")
331 receiver = _sync_wrapper(receiver) # type: ignore[arg-type]
332 result = await receiver(sender, **kwargs) # type: ignore[call-arg, misc]
333 results.append((receiver, result))
334 return results
335
336 def _extract_sender(self, sender: t.Any) -> t.Any:
337 if not self.receivers:
338 # Ensure correct signature even on no-op sends, disable with -O
339 # for lowest possible cost.
340 if __debug__ and sender and len(sender) > 1:
341 raise TypeError(
342 f"send() accepts only one positional argument, {len(sender)} given"
343 )
344 return []
345
346 # Using '*sender' rather than 'sender=None' allows 'sender' to be
347 # used as a keyword argument- i.e. it's an invisible name in the
348 # function signature.
349 if len(sender) == 0:
350 sender = None
351 elif len(sender) > 1:
352 raise TypeError(
353 f"send() accepts only one positional argument, {len(sender)} given"
354 )
355 else:
356 sender = sender[0]
357 return sender
358
359 def has_receivers_for(self, sender: t.Any) -> bool:
360 """True if there is probably a receiver for *sender*.
361
362 Performs an optimistic check only. Does not guarantee that all
363 weakly referenced receivers are still alive. See
364 :meth:`receivers_for` for a stronger search.
365
366 """
367 if not self.receivers:
368 return False
369 if self._by_sender[ANY_ID]:
370 return True
371 if sender is ANY:
372 return False
373 return hashable_identity(sender) in self._by_sender
374
375 def receivers_for(
376 self, sender: t.Any
377 ) -> t.Generator[t.Callable | annotatable_weakref, None, None]:
378 """Iterate all live receivers listening for *sender*."""
379 # TODO: test receivers_for(ANY)
380 if self.receivers:
381 sender_id = hashable_identity(sender)
382 if sender_id in self._by_sender:
383 ids = self._by_sender[ANY_ID] | self._by_sender[sender_id]
384 else:
385 ids = self._by_sender[ANY_ID].copy()
386 for receiver_id in ids:
387 receiver = self.receivers.get(receiver_id)
388 if receiver is None:
389 continue
390 if isinstance(receiver, WeakTypes):
391 strong = receiver()
392 if strong is None:
393 self._disconnect(receiver_id, ANY_ID)
394 continue
395 receiver = strong
396 yield receiver # type: ignore[misc]
397
398 def disconnect(self, receiver: t.Callable, sender: t.Any = ANY) -> None:
399 """Disconnect *receiver* from this signal's events.
400
401 :param receiver: a previously :meth:`connected<connect>` callable
402
403 :param sender: a specific sender to disconnect from, or :obj:`ANY`
404 to disconnect from all senders. Defaults to ``ANY``.
405
406 """
407 sender_id: IdentityType
408 if sender is ANY:
409 sender_id = ANY_ID
410 else:
411 sender_id = hashable_identity(sender)
412 receiver_id = hashable_identity(receiver)
413 self._disconnect(receiver_id, sender_id)
414
415 if (
416 "receiver_disconnected" in self.__dict__
417 and self.receiver_disconnected.receivers
418 ):
419 self.receiver_disconnected.send(self, receiver=receiver, sender=sender)
420
421 def _disconnect(self, receiver_id: IdentityType, sender_id: IdentityType) -> None:
422 if sender_id == ANY_ID:
423 if self._by_receiver.pop(receiver_id, False):
424 for bucket in self._by_sender.values():
425 bucket.discard(receiver_id)
426 self.receivers.pop(receiver_id, None)
427 else:
428 self._by_sender[sender_id].discard(receiver_id)
429 self._by_receiver[receiver_id].discard(sender_id)
430
431 def _cleanup_receiver(self, receiver_ref: annotatable_weakref) -> None:
432 """Disconnect a receiver from all senders."""
433 self._disconnect(t.cast(IdentityType, receiver_ref.receiver_id), ANY_ID)
434
435 def _cleanup_sender(self, sender_ref: annotatable_weakref) -> None:
436 """Disconnect all receivers from a sender."""
437 sender_id = t.cast(IdentityType, sender_ref.sender_id)
438 assert sender_id != ANY_ID
439 self._weak_senders.pop(sender_id, None)
440 for receiver_id in self._by_sender.pop(sender_id, ()):
441 self._by_receiver[receiver_id].discard(sender_id)
442
443 def _cleanup_bookkeeping(self) -> None:
444 """Prune unused sender/receiver bookkeeping. Not threadsafe.
445
446 Connecting & disconnecting leave behind a small amount of bookkeeping
447 for the receiver and sender values. Typical workloads using Blinker,
448 for example in most web apps, Flask, CLI scripts, etc., are not
449 adversely affected by this bookkeeping.
450
451 With a long-running Python process performing dynamic signal routing
452 with high volume- e.g. connecting to function closures, "senders" are
453 all unique object instances, and doing all of this over and over- you
454 may see memory usage will grow due to extraneous bookkeeping. (An empty
455 set() for each stale sender/receiver pair.)
456
457 This method will prune that bookkeeping away, with the caveat that such
458 pruning is not threadsafe. The risk is that cleanup of a fully
459 disconnected receiver/sender pair occurs while another thread is
460 connecting that same pair. If you are in the highly dynamic, unique
461 receiver/sender situation that has lead you to this method, that
462 failure mode is perhaps not a big deal for you.
463 """
464 for mapping in (self._by_sender, self._by_receiver):
465 for _id, bucket in list(mapping.items()):
466 if not bucket:
467 mapping.pop(_id, None)
468
469 def _clear_state(self) -> None:
470 """Throw away all signal state. Useful for unit tests."""
471 self._weak_senders.clear()
472 self.receivers.clear()
473 self._by_sender.clear()
474 self._by_receiver.clear()
475
476
477 receiver_connected = Signal(
478 """\
479 Sent by a :class:`Signal` after a receiver connects.
480
481 :argument: the Signal that was connected to
482 :keyword receiver_arg: the connected receiver
483 :keyword sender_arg: the sender to connect to
484 :keyword weak_arg: true if the connection to receiver_arg is a weak reference
485
486 .. deprecated:: 1.2
487
488 As of 1.2, individual signals have their own private
489 :attr:`~Signal.receiver_connected` and
490 :attr:`~Signal.receiver_disconnected` signals with a slightly simplified
491 call signature. This global signal is planned to be removed in 1.6.
492
493 """
494 )
495
496
497 class NamedSignal(Signal):
498 """A named generic notification emitter."""
499
500 def __init__(self, name: str, doc: str | None = None) -> None:
501 Signal.__init__(self, doc)
502
503 #: The name of this signal.
504 self.name = name
505
506 def __repr__(self) -> str:
507 base = Signal.__repr__(self)
508 return f"{base[:-1]}; {self.name!r}>"
509
510
511 class Namespace(dict):
512 """A mapping of signal names to signals."""
513
514 def signal(self, name: str, doc: str | None = None) -> NamedSignal:
515 """Return the :class:`NamedSignal` *name*, creating it if required.
516
517 Repeated calls to this function will return the same signal object.
518
519 """
520 try:
521 return self[name] # type: ignore[no-any-return]
522 except KeyError:
523 result = self.setdefault(name, NamedSignal(name, doc))
524 return result # type: ignore[no-any-return]
525
526
527 class WeakNamespace(WeakValueDictionary):
528 """A weak mapping of signal names to signals.
529
530 Automatically cleans up unused Signals when the last reference goes out
531 of scope. This namespace implementation exists for a measure of legacy
532 compatibility with Blinker <= 1.2, and may be dropped in the future.
533
534 .. versionadded:: 1.3
535
536 """
537
538 def signal(self, name: str, doc: str | None = None) -> NamedSignal:
539 """Return the :class:`NamedSignal` *name*, creating it if required.
540
541 Repeated calls to this function will return the same signal object.
542
543 """
544 try:
545 return self[name] # type: ignore[no-any-return]
546 except KeyError:
547 result = self.setdefault(name, NamedSignal(name, doc))
548 return result # type: ignore[no-any-return]
549
550
551 signal = Namespace().signal