]>
Commit | Line | Data |
---|---|---|
e0df8241 JR |
1 | # util.py |
2 | import inspect | |
3 | import warnings | |
4 | import types | |
5 | import collections | |
6 | import itertools | |
7 | from functools import lru_cache, wraps | |
8 | from typing import Callable, List, Union, Iterable, TypeVar, cast | |
9 | ||
10 | _bslash = chr(92) | |
11 | C = TypeVar("C", bound=Callable) | |
12 | ||
13 | ||
14 | class __config_flags: | |
15 | """Internal class for defining compatibility and debugging flags""" | |
16 | ||
17 | _all_names: List[str] = [] | |
18 | _fixed_names: List[str] = [] | |
19 | _type_desc = "configuration" | |
20 | ||
21 | @classmethod | |
22 | def _set(cls, dname, value): | |
23 | if dname in cls._fixed_names: | |
24 | warnings.warn( | |
25 | f"{cls.__name__}.{dname} {cls._type_desc} is {str(getattr(cls, dname)).upper()}" | |
26 | f" and cannot be overridden", | |
27 | stacklevel=3, | |
28 | ) | |
29 | return | |
30 | if dname in cls._all_names: | |
31 | setattr(cls, dname, value) | |
32 | else: | |
33 | raise ValueError(f"no such {cls._type_desc} {dname!r}") | |
34 | ||
35 | enable = classmethod(lambda cls, name: cls._set(name, True)) | |
36 | disable = classmethod(lambda cls, name: cls._set(name, False)) | |
37 | ||
38 | ||
39 | @lru_cache(maxsize=128) | |
40 | def col(loc: int, strg: str) -> int: | |
41 | """ | |
42 | Returns current column within a string, counting newlines as line separators. | |
43 | The first column is number 1. | |
44 | ||
45 | Note: the default parsing behavior is to expand tabs in the input string | |
46 | before starting the parsing process. See | |
47 | :class:`ParserElement.parse_string` for more | |
48 | information on parsing strings containing ``<TAB>`` s, and suggested | |
49 | methods to maintain a consistent view of the parsed string, the parse | |
50 | location, and line and column positions within the parsed string. | |
51 | """ | |
52 | s = strg | |
53 | return 1 if 0 < loc < len(s) and s[loc - 1] == "\n" else loc - s.rfind("\n", 0, loc) | |
54 | ||
55 | ||
56 | @lru_cache(maxsize=128) | |
57 | def lineno(loc: int, strg: str) -> int: | |
58 | """Returns current line number within a string, counting newlines as line separators. | |
59 | The first line is number 1. | |
60 | ||
61 | Note - the default parsing behavior is to expand tabs in the input string | |
62 | before starting the parsing process. See :class:`ParserElement.parse_string` | |
63 | for more information on parsing strings containing ``<TAB>`` s, and | |
64 | suggested methods to maintain a consistent view of the parsed string, the | |
65 | parse location, and line and column positions within the parsed string. | |
66 | """ | |
67 | return strg.count("\n", 0, loc) + 1 | |
68 | ||
69 | ||
70 | @lru_cache(maxsize=128) | |
71 | def line(loc: int, strg: str) -> str: | |
72 | """ | |
73 | Returns the line of text containing loc within a string, counting newlines as line separators. | |
74 | """ | |
75 | last_cr = strg.rfind("\n", 0, loc) | |
76 | next_cr = strg.find("\n", loc) | |
77 | return strg[last_cr + 1 : next_cr] if next_cr >= 0 else strg[last_cr + 1 :] | |
78 | ||
79 | ||
80 | class _UnboundedCache: | |
81 | def __init__(self): | |
82 | cache = {} | |
83 | cache_get = cache.get | |
84 | self.not_in_cache = not_in_cache = object() | |
85 | ||
86 | def get(_, key): | |
87 | return cache_get(key, not_in_cache) | |
88 | ||
89 | def set_(_, key, value): | |
90 | cache[key] = value | |
91 | ||
92 | def clear(_): | |
93 | cache.clear() | |
94 | ||
95 | self.size = None | |
96 | self.get = types.MethodType(get, self) | |
97 | self.set = types.MethodType(set_, self) | |
98 | self.clear = types.MethodType(clear, self) | |
99 | ||
100 | ||
101 | class _FifoCache: | |
102 | def __init__(self, size): | |
103 | self.not_in_cache = not_in_cache = object() | |
104 | cache = {} | |
105 | keyring = [object()] * size | |
106 | cache_get = cache.get | |
107 | cache_pop = cache.pop | |
108 | keyiter = itertools.cycle(range(size)) | |
109 | ||
110 | def get(_, key): | |
111 | return cache_get(key, not_in_cache) | |
112 | ||
113 | def set_(_, key, value): | |
114 | cache[key] = value | |
115 | i = next(keyiter) | |
116 | cache_pop(keyring[i], None) | |
117 | keyring[i] = key | |
118 | ||
119 | def clear(_): | |
120 | cache.clear() | |
121 | keyring[:] = [object()] * size | |
122 | ||
123 | self.size = size | |
124 | self.get = types.MethodType(get, self) | |
125 | self.set = types.MethodType(set_, self) | |
126 | self.clear = types.MethodType(clear, self) | |
127 | ||
128 | ||
129 | class LRUMemo: | |
130 | """ | |
131 | A memoizing mapping that retains `capacity` deleted items | |
132 | ||
133 | The memo tracks retained items by their access order; once `capacity` items | |
134 | are retained, the least recently used item is discarded. | |
135 | """ | |
136 | ||
137 | def __init__(self, capacity): | |
138 | self._capacity = capacity | |
139 | self._active = {} | |
140 | self._memory = collections.OrderedDict() | |
141 | ||
142 | def __getitem__(self, key): | |
143 | try: | |
144 | return self._active[key] | |
145 | except KeyError: | |
146 | self._memory.move_to_end(key) | |
147 | return self._memory[key] | |
148 | ||
149 | def __setitem__(self, key, value): | |
150 | self._memory.pop(key, None) | |
151 | self._active[key] = value | |
152 | ||
153 | def __delitem__(self, key): | |
154 | try: | |
155 | value = self._active.pop(key) | |
156 | except KeyError: | |
157 | pass | |
158 | else: | |
159 | while len(self._memory) >= self._capacity: | |
160 | self._memory.popitem(last=False) | |
161 | self._memory[key] = value | |
162 | ||
163 | def clear(self): | |
164 | self._active.clear() | |
165 | self._memory.clear() | |
166 | ||
167 | ||
168 | class UnboundedMemo(dict): | |
169 | """ | |
170 | A memoizing mapping that retains all deleted items | |
171 | """ | |
172 | ||
173 | def __delitem__(self, key): | |
174 | pass | |
175 | ||
176 | ||
177 | def _escape_regex_range_chars(s: str) -> str: | |
178 | # escape these chars: ^-[] | |
179 | for c in r"\^-[]": | |
180 | s = s.replace(c, _bslash + c) | |
181 | s = s.replace("\n", r"\n") | |
182 | s = s.replace("\t", r"\t") | |
183 | return str(s) | |
184 | ||
185 | ||
186 | def _collapse_string_to_ranges( | |
187 | s: Union[str, Iterable[str]], re_escape: bool = True | |
188 | ) -> str: | |
189 | def is_consecutive(c): | |
190 | c_int = ord(c) | |
191 | is_consecutive.prev, prev = c_int, is_consecutive.prev | |
192 | if c_int - prev > 1: | |
193 | is_consecutive.value = next(is_consecutive.counter) | |
194 | return is_consecutive.value | |
195 | ||
196 | is_consecutive.prev = 0 # type: ignore [attr-defined] | |
197 | is_consecutive.counter = itertools.count() # type: ignore [attr-defined] | |
198 | is_consecutive.value = -1 # type: ignore [attr-defined] | |
199 | ||
200 | def escape_re_range_char(c): | |
201 | return "\\" + c if c in r"\^-][" else c | |
202 | ||
203 | def no_escape_re_range_char(c): | |
204 | return c | |
205 | ||
206 | if not re_escape: | |
207 | escape_re_range_char = no_escape_re_range_char | |
208 | ||
209 | ret = [] | |
210 | s = "".join(sorted(set(s))) | |
211 | if len(s) > 3: | |
212 | for _, chars in itertools.groupby(s, key=is_consecutive): | |
213 | first = last = next(chars) | |
214 | last = collections.deque( | |
215 | itertools.chain(iter([last]), chars), maxlen=1 | |
216 | ).pop() | |
217 | if first == last: | |
218 | ret.append(escape_re_range_char(first)) | |
219 | else: | |
220 | sep = "" if ord(last) == ord(first) + 1 else "-" | |
221 | ret.append( | |
222 | f"{escape_re_range_char(first)}{sep}{escape_re_range_char(last)}" | |
223 | ) | |
224 | else: | |
225 | ret = [escape_re_range_char(c) for c in s] | |
226 | ||
227 | return "".join(ret) | |
228 | ||
229 | ||
230 | def _flatten(ll: list) -> list: | |
231 | ret = [] | |
232 | for i in ll: | |
233 | if isinstance(i, list): | |
234 | ret.extend(_flatten(i)) | |
235 | else: | |
236 | ret.append(i) | |
237 | return ret | |
238 | ||
239 | ||
240 | def _make_synonym_function(compat_name: str, fn: C) -> C: | |
241 | # In a future version, uncomment the code in the internal _inner() functions | |
242 | # to begin emitting DeprecationWarnings. | |
243 | ||
244 | # Unwrap staticmethod/classmethod | |
245 | fn = getattr(fn, "__func__", fn) | |
246 | ||
247 | # (Presence of 'self' arg in signature is used by explain_exception() methods, so we take | |
248 | # some extra steps to add it if present in decorated function.) | |
249 | if "self" == list(inspect.signature(fn).parameters)[0]: | |
250 | ||
251 | @wraps(fn) | |
252 | def _inner(self, *args, **kwargs): | |
253 | # warnings.warn( | |
254 | # f"Deprecated - use {fn.__name__}", DeprecationWarning, stacklevel=3 | |
255 | # ) | |
256 | return fn(self, *args, **kwargs) | |
257 | ||
258 | else: | |
259 | ||
260 | @wraps(fn) | |
261 | def _inner(*args, **kwargs): | |
262 | # warnings.warn( | |
263 | # f"Deprecated - use {fn.__name__}", DeprecationWarning, stacklevel=3 | |
264 | # ) | |
265 | return fn(*args, **kwargs) | |
266 | ||
267 | _inner.__doc__ = f"""Deprecated - use :class:`{fn.__name__}`""" | |
268 | _inner.__name__ = compat_name | |
269 | _inner.__annotations__ = fn.__annotations__ | |
270 | if isinstance(fn, types.FunctionType): | |
271 | _inner.__kwdefaults__ = fn.__kwdefaults__ | |
272 | elif isinstance(fn, type) and hasattr(fn, "__init__"): | |
273 | _inner.__kwdefaults__ = fn.__init__.__kwdefaults__ | |
274 | else: | |
275 | _inner.__kwdefaults__ = None | |
276 | _inner.__qualname__ = fn.__qualname__ | |
277 | return cast(C, _inner) | |
278 | ||
279 | ||
280 | def replaced_by_pep8(fn: C) -> Callable[[Callable], C]: | |
281 | """ | |
282 | Decorator for pre-PEP8 compatibility synonyms, to link them to the new function. | |
283 | """ | |
284 | return lambda other: _make_synonym_function(other.__name__, fn) |