]>
Commit | Line | Data |
---|---|---|
e0df8241 JR |
1 | import hashlib |
2 | from typing import TYPE_CHECKING, BinaryIO, Dict, Iterable, List, Optional | |
3 | ||
4 | from pip._internal.exceptions import HashMismatch, HashMissing, InstallationError | |
5 | from pip._internal.utils.misc import read_chunks | |
6 | ||
7 | if TYPE_CHECKING: | |
8 | from hashlib import _Hash | |
9 | ||
10 | # NoReturn introduced in 3.6.2; imported only for type checking to maintain | |
11 | # pip compatibility with older patch versions of Python 3.6 | |
12 | from typing import NoReturn | |
13 | ||
14 | ||
15 | # The recommended hash algo of the moment. Change this whenever the state of | |
16 | # the art changes; it won't hurt backward compatibility. | |
17 | FAVORITE_HASH = "sha256" | |
18 | ||
19 | ||
20 | # Names of hashlib algorithms allowed by the --hash option and ``pip hash`` | |
21 | # Currently, those are the ones at least as collision-resistant as sha256. | |
22 | STRONG_HASHES = ["sha256", "sha384", "sha512"] | |
23 | ||
24 | ||
25 | class Hashes: | |
26 | """A wrapper that builds multiple hashes at once and checks them against | |
27 | known-good values | |
28 | ||
29 | """ | |
30 | ||
31 | def __init__(self, hashes: Optional[Dict[str, List[str]]] = None) -> None: | |
32 | """ | |
33 | :param hashes: A dict of algorithm names pointing to lists of allowed | |
34 | hex digests | |
35 | """ | |
36 | allowed = {} | |
37 | if hashes is not None: | |
38 | for alg, keys in hashes.items(): | |
39 | # Make sure values are always sorted (to ease equality checks) | |
40 | allowed[alg] = sorted(keys) | |
41 | self._allowed = allowed | |
42 | ||
43 | def __and__(self, other: "Hashes") -> "Hashes": | |
44 | if not isinstance(other, Hashes): | |
45 | return NotImplemented | |
46 | ||
47 | # If either of the Hashes object is entirely empty (i.e. no hash | |
48 | # specified at all), all hashes from the other object are allowed. | |
49 | if not other: | |
50 | return self | |
51 | if not self: | |
52 | return other | |
53 | ||
54 | # Otherwise only hashes that present in both objects are allowed. | |
55 | new = {} | |
56 | for alg, values in other._allowed.items(): | |
57 | if alg not in self._allowed: | |
58 | continue | |
59 | new[alg] = [v for v in values if v in self._allowed[alg]] | |
60 | return Hashes(new) | |
61 | ||
62 | @property | |
63 | def digest_count(self) -> int: | |
64 | return sum(len(digests) for digests in self._allowed.values()) | |
65 | ||
66 | def is_hash_allowed(self, hash_name: str, hex_digest: str) -> bool: | |
67 | """Return whether the given hex digest is allowed.""" | |
68 | return hex_digest in self._allowed.get(hash_name, []) | |
69 | ||
70 | def check_against_chunks(self, chunks: Iterable[bytes]) -> None: | |
71 | """Check good hashes against ones built from iterable of chunks of | |
72 | data. | |
73 | ||
74 | Raise HashMismatch if none match. | |
75 | ||
76 | """ | |
77 | gots = {} | |
78 | for hash_name in self._allowed.keys(): | |
79 | try: | |
80 | gots[hash_name] = hashlib.new(hash_name) | |
81 | except (ValueError, TypeError): | |
82 | raise InstallationError(f"Unknown hash name: {hash_name}") | |
83 | ||
84 | for chunk in chunks: | |
85 | for hash in gots.values(): | |
86 | hash.update(chunk) | |
87 | ||
88 | for hash_name, got in gots.items(): | |
89 | if got.hexdigest() in self._allowed[hash_name]: | |
90 | return | |
91 | self._raise(gots) | |
92 | ||
93 | def _raise(self, gots: Dict[str, "_Hash"]) -> "NoReturn": | |
94 | raise HashMismatch(self._allowed, gots) | |
95 | ||
96 | def check_against_file(self, file: BinaryIO) -> None: | |
97 | """Check good hashes against a file-like object | |
98 | ||
99 | Raise HashMismatch if none match. | |
100 | ||
101 | """ | |
102 | return self.check_against_chunks(read_chunks(file)) | |
103 | ||
104 | def check_against_path(self, path: str) -> None: | |
105 | with open(path, "rb") as file: | |
106 | return self.check_against_file(file) | |
107 | ||
108 | def has_one_of(self, hashes: Dict[str, str]) -> bool: | |
109 | """Return whether any of the given hashes are allowed.""" | |
110 | for hash_name, hex_digest in hashes.items(): | |
111 | if self.is_hash_allowed(hash_name, hex_digest): | |
112 | return True | |
113 | return False | |
114 | ||
115 | def __bool__(self) -> bool: | |
116 | """Return whether I know any known-good hashes.""" | |
117 | return bool(self._allowed) | |
118 | ||
119 | def __eq__(self, other: object) -> bool: | |
120 | if not isinstance(other, Hashes): | |
121 | return NotImplemented | |
122 | return self._allowed == other._allowed | |
123 | ||
124 | def __hash__(self) -> int: | |
125 | return hash( | |
126 | ",".join( | |
127 | sorted( | |
128 | ":".join((alg, digest)) | |
129 | for alg, digest_list in self._allowed.items() | |
130 | for digest in digest_list | |
131 | ) | |
132 | ) | |
133 | ) | |
134 | ||
135 | ||
136 | class MissingHashes(Hashes): | |
137 | """A workalike for Hashes used when we're missing a hash for a requirement | |
138 | ||
139 | It computes the actual hash of the requirement and raises a HashMissing | |
140 | exception showing it to the user. | |
141 | ||
142 | """ | |
143 | ||
144 | def __init__(self) -> None: | |
145 | """Don't offer the ``hashes`` kwarg.""" | |
146 | # Pass our favorite hash in to generate a "gotten hash". With the | |
147 | # empty list, it will never match, so an error will always raise. | |
148 | super().__init__(hashes={FAVORITE_HASH: []}) | |
149 | ||
150 | def _raise(self, gots: Dict[str, "_Hash"]) -> "NoReturn": | |
151 | raise HashMissing(gots[FAVORITE_HASH].hexdigest()) |