]> jfr.im git - dlqueue.git/blob - venv/lib/python3.11/site-packages/pip/_internal/self_outdated_check.py
init: venv aand flask
[dlqueue.git] / venv / lib / python3.11 / site-packages / pip / _internal / self_outdated_check.py
1 import datetime
2 import functools
3 import hashlib
4 import json
5 import logging
6 import optparse
7 import os.path
8 import sys
9 from dataclasses import dataclass
10 from typing import Any, Callable, Dict, Optional
11
12 from pip._vendor.packaging.version import parse as parse_version
13 from pip._vendor.rich.console import Group
14 from pip._vendor.rich.markup import escape
15 from pip._vendor.rich.text import Text
16
17 from pip._internal.index.collector import LinkCollector
18 from pip._internal.index.package_finder import PackageFinder
19 from pip._internal.metadata import get_default_environment
20 from pip._internal.metadata.base import DistributionVersion
21 from pip._internal.models.selection_prefs import SelectionPreferences
22 from pip._internal.network.session import PipSession
23 from pip._internal.utils.compat import WINDOWS
24 from pip._internal.utils.entrypoints import (
25 get_best_invocation_for_this_pip,
26 get_best_invocation_for_this_python,
27 )
28 from pip._internal.utils.filesystem import adjacent_tmp_file, check_path_owner, replace
29 from pip._internal.utils.misc import ensure_dir
30
31 _DATE_FMT = "%Y-%m-%dT%H:%M:%SZ"
32
33
34 logger = logging.getLogger(__name__)
35
36
37 def _get_statefile_name(key: str) -> str:
38 key_bytes = key.encode()
39 name = hashlib.sha224(key_bytes).hexdigest()
40 return name
41
42
43 class SelfCheckState:
44 def __init__(self, cache_dir: str) -> None:
45 self._state: Dict[str, Any] = {}
46 self._statefile_path = None
47
48 # Try to load the existing state
49 if cache_dir:
50 self._statefile_path = os.path.join(
51 cache_dir, "selfcheck", _get_statefile_name(self.key)
52 )
53 try:
54 with open(self._statefile_path, encoding="utf-8") as statefile:
55 self._state = json.load(statefile)
56 except (OSError, ValueError, KeyError):
57 # Explicitly suppressing exceptions, since we don't want to
58 # error out if the cache file is invalid.
59 pass
60
61 @property
62 def key(self) -> str:
63 return sys.prefix
64
65 def get(self, current_time: datetime.datetime) -> Optional[str]:
66 """Check if we have a not-outdated version loaded already."""
67 if not self._state:
68 return None
69
70 if "last_check" not in self._state:
71 return None
72
73 if "pypi_version" not in self._state:
74 return None
75
76 seven_days_in_seconds = 7 * 24 * 60 * 60
77
78 # Determine if we need to refresh the state
79 last_check = datetime.datetime.strptime(self._state["last_check"], _DATE_FMT)
80 seconds_since_last_check = (current_time - last_check).total_seconds()
81 if seconds_since_last_check > seven_days_in_seconds:
82 return None
83
84 return self._state["pypi_version"]
85
86 def set(self, pypi_version: str, current_time: datetime.datetime) -> None:
87 # If we do not have a path to cache in, don't bother saving.
88 if not self._statefile_path:
89 return
90
91 # Check to make sure that we own the directory
92 if not check_path_owner(os.path.dirname(self._statefile_path)):
93 return
94
95 # Now that we've ensured the directory is owned by this user, we'll go
96 # ahead and make sure that all our directories are created.
97 ensure_dir(os.path.dirname(self._statefile_path))
98
99 state = {
100 # Include the key so it's easy to tell which pip wrote the
101 # file.
102 "key": self.key,
103 "last_check": current_time.strftime(_DATE_FMT),
104 "pypi_version": pypi_version,
105 }
106
107 text = json.dumps(state, sort_keys=True, separators=(",", ":"))
108
109 with adjacent_tmp_file(self._statefile_path) as f:
110 f.write(text.encode())
111
112 try:
113 # Since we have a prefix-specific state file, we can just
114 # overwrite whatever is there, no need to check.
115 replace(f.name, self._statefile_path)
116 except OSError:
117 # Best effort.
118 pass
119
120
121 @dataclass
122 class UpgradePrompt:
123 old: str
124 new: str
125
126 def __rich__(self) -> Group:
127 if WINDOWS:
128 pip_cmd = f"{get_best_invocation_for_this_python()} -m pip"
129 else:
130 pip_cmd = get_best_invocation_for_this_pip()
131
132 notice = "[bold][[reset][blue]notice[reset][bold]][reset]"
133 return Group(
134 Text(),
135 Text.from_markup(
136 f"{notice} A new release of pip is available: "
137 f"[red]{self.old}[reset] -> [green]{self.new}[reset]"
138 ),
139 Text.from_markup(
140 f"{notice} To update, run: "
141 f"[green]{escape(pip_cmd)} install --upgrade pip"
142 ),
143 )
144
145
146 def was_installed_by_pip(pkg: str) -> bool:
147 """Checks whether pkg was installed by pip
148
149 This is used not to display the upgrade message when pip is in fact
150 installed by system package manager, such as dnf on Fedora.
151 """
152 dist = get_default_environment().get_distribution(pkg)
153 return dist is not None and "pip" == dist.installer
154
155
156 def _get_current_remote_pip_version(
157 session: PipSession, options: optparse.Values
158 ) -> Optional[str]:
159 # Lets use PackageFinder to see what the latest pip version is
160 link_collector = LinkCollector.create(
161 session,
162 options=options,
163 suppress_no_index=True,
164 )
165
166 # Pass allow_yanked=False so we don't suggest upgrading to a
167 # yanked version.
168 selection_prefs = SelectionPreferences(
169 allow_yanked=False,
170 allow_all_prereleases=False, # Explicitly set to False
171 )
172
173 finder = PackageFinder.create(
174 link_collector=link_collector,
175 selection_prefs=selection_prefs,
176 )
177 best_candidate = finder.find_best_candidate("pip").best_candidate
178 if best_candidate is None:
179 return None
180
181 return str(best_candidate.version)
182
183
184 def _self_version_check_logic(
185 *,
186 state: SelfCheckState,
187 current_time: datetime.datetime,
188 local_version: DistributionVersion,
189 get_remote_version: Callable[[], Optional[str]],
190 ) -> Optional[UpgradePrompt]:
191 remote_version_str = state.get(current_time)
192 if remote_version_str is None:
193 remote_version_str = get_remote_version()
194 if remote_version_str is None:
195 logger.debug("No remote pip version found")
196 return None
197 state.set(remote_version_str, current_time)
198
199 remote_version = parse_version(remote_version_str)
200 logger.debug("Remote version of pip: %s", remote_version)
201 logger.debug("Local version of pip: %s", local_version)
202
203 pip_installed_by_pip = was_installed_by_pip("pip")
204 logger.debug("Was pip installed by pip? %s", pip_installed_by_pip)
205 if not pip_installed_by_pip:
206 return None # Only suggest upgrade if pip is installed by pip.
207
208 local_version_is_older = (
209 local_version < remote_version
210 and local_version.base_version != remote_version.base_version
211 )
212 if local_version_is_older:
213 return UpgradePrompt(old=str(local_version), new=remote_version_str)
214
215 return None
216
217
218 def pip_self_version_check(session: PipSession, options: optparse.Values) -> None:
219 """Check for an update for pip.
220
221 Limit the frequency of checks to once per week. State is stored either in
222 the active virtualenv or in the user's USER_CACHE_DIR keyed off the prefix
223 of the pip script path.
224 """
225 installed_dist = get_default_environment().get_distribution("pip")
226 if not installed_dist:
227 return
228
229 try:
230 upgrade_prompt = _self_version_check_logic(
231 state=SelfCheckState(cache_dir=options.cache_dir),
232 current_time=datetime.datetime.utcnow(),
233 local_version=installed_dist.version,
234 get_remote_version=functools.partial(
235 _get_current_remote_pip_version, session, options
236 ),
237 )
238 if upgrade_prompt is not None:
239 logger.warning("[present-rich] %s", upgrade_prompt)
240 except Exception:
241 logger.warning("There was an error checking the latest version of pip.")
242 logger.debug("See below for error", exc_info=True)