]>
jfr.im git - dlqueue.git/blob - venv/lib/python3.11/site-packages/pip/_vendor/tenacity/_asyncio.py
1 # Copyright 2016 Étienne Bersac
2 # Copyright 2016 Julien Danjou
3 # Copyright 2016 Joshua Harlow
4 # Copyright 2013-2014 Ray Holder
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
21 from asyncio
import sleep
23 from pip
._vendor
.tenacity
import AttemptManager
24 from pip
._vendor
.tenacity
import BaseRetrying
25 from pip
._vendor
.tenacity
import DoAttempt
26 from pip
._vendor
.tenacity
import DoSleep
27 from pip
._vendor
.tenacity
import RetryCallState
29 WrappedFnReturnT
= t
.TypeVar("WrappedFnReturnT")
30 WrappedFn
= t
.TypeVar("WrappedFn", bound
=t
.Callable
[..., t
.Awaitable
[t
.Any
]])
33 class AsyncRetrying(BaseRetrying
):
34 sleep
: t
.Callable
[[float], t
.Awaitable
[t
.Any
]]
36 def __init__(self
, sleep
: t
.Callable
[[float], t
.Awaitable
[t
.Any
]] = sleep
, **kwargs
: t
.Any
) -> None:
37 super().__init
__(**kwargs
)
40 async def __call__( # type: ignore[override]
41 self
, fn
: WrappedFn
, *args
: t
.Any
, **kwargs
: t
.Any
42 ) -> WrappedFnReturnT
:
45 retry_state
= RetryCallState(retry_object
=self
, fn
=fn
, args
=args
, kwargs
=kwargs
)
47 do
= self
.iter(retry_state
=retry_state
)
48 if isinstance(do
, DoAttempt
):
50 result
= await fn(*args
, **kwargs
)
51 except BaseException
: # noqa: B902
52 retry_state
.set_exception(sys
.exc_info()) # type: ignore[arg-type]
54 retry_state
.set_result(result
)
55 elif isinstance(do
, DoSleep
):
56 retry_state
.prepare_for_next_attempt()
59 return do
# type: ignore[no-any-return]
61 def __iter__(self
) -> t
.Generator
[AttemptManager
, None, None]:
62 raise TypeError("AsyncRetrying object is not iterable")
64 def __aiter__(self
) -> "AsyncRetrying":
66 self
._retry
_state
= RetryCallState(self
, fn
=None, args
=(), kwargs
={})
69 async def __anext__(self
) -> AttemptManager
:
71 do
= self
.iter(retry_state
=self
._retry
_state
)
73 raise StopAsyncIteration
74 elif isinstance(do
, DoAttempt
):
75 return AttemptManager(retry_state
=self
._retry
_state
)
76 elif isinstance(do
, DoSleep
):
77 self
._retry
_state
.prepare_for_next_attempt()
80 raise StopAsyncIteration
82 def wraps(self
, fn
: WrappedFn
) -> WrappedFn
:
83 fn
= super().wraps(fn
)
84 # Ensure wrapper is recognized as a coroutine function.
87 async def async_wrapped(*args
: t
.Any
, **kwargs
: t
.Any
) -> t
.Any
:
88 return await fn(*args
, **kwargs
)
91 async_wrapped
.retry
= fn
.retry
# type: ignore[attr-defined]
92 async_wrapped
.retry_with
= fn
.retry_with
# type: ignore[attr-defined]
94 return async_wrapped
# type: ignore[return-value]