]> jfr.im git - yt-dlp.git/blob - yt_dlp/jsinterp.py
[extractor/dropout] Support cookies and login only as needed (#4075)
[yt-dlp.git] / yt_dlp / jsinterp.py
1 import collections
2 import contextlib
3 import json
4 import operator
5 import re
6
7 from .utils import ExtractorError, remove_quotes
8
9 _NAME_RE = r'[a-zA-Z_$][\w$]*'
10 _OPERATORS = {
11 '|': operator.or_,
12 '^': operator.xor,
13 '&': operator.and_,
14 '>>': operator.rshift,
15 '<<': operator.lshift,
16 '-': operator.sub,
17 '+': operator.add,
18 '%': operator.mod,
19 '/': operator.truediv,
20 '*': operator.mul,
21 }
22
23 _MATCHING_PARENS = dict(zip('({[', ')}]'))
24 _QUOTES = '\'"'
25
26
27 class JS_Break(ExtractorError):
28 def __init__(self):
29 ExtractorError.__init__(self, 'Invalid break')
30
31
32 class JS_Continue(ExtractorError):
33 def __init__(self):
34 ExtractorError.__init__(self, 'Invalid continue')
35
36
37 class LocalNameSpace(collections.ChainMap):
38 def __setitem__(self, key, value):
39 for scope in self.maps:
40 if key in scope:
41 scope[key] = value
42 return
43 self.maps[0][key] = value
44
45 def __delitem__(self, key):
46 raise NotImplementedError('Deleting is not supported')
47
48
49 class JSInterpreter:
50 __named_object_counter = 0
51
52 def __init__(self, code, objects=None):
53 self.code, self._functions = code, {}
54 self._objects = {} if objects is None else objects
55
56 def _named_object(self, namespace, obj):
57 self.__named_object_counter += 1
58 name = f'__yt_dlp_jsinterp_obj{self.__named_object_counter}'
59 namespace[name] = obj
60 return name
61
62 @staticmethod
63 def _separate(expr, delim=',', max_split=None):
64 if not expr:
65 return
66 counters = {k: 0 for k in _MATCHING_PARENS.values()}
67 start, splits, pos, delim_len = 0, 0, 0, len(delim) - 1
68 in_quote, escaping = None, False
69 for idx, char in enumerate(expr):
70 if char in _MATCHING_PARENS:
71 counters[_MATCHING_PARENS[char]] += 1
72 elif char in counters:
73 counters[char] -= 1
74 elif not escaping and char in _QUOTES and in_quote in (char, None):
75 in_quote = None if in_quote else char
76 escaping = not escaping and in_quote and char == '\\'
77
78 if char != delim[pos] or any(counters.values()) or in_quote:
79 pos = 0
80 continue
81 elif pos != delim_len:
82 pos += 1
83 continue
84 yield expr[start: idx - delim_len]
85 start, pos = idx + 1, 0
86 splits += 1
87 if max_split and splits >= max_split:
88 break
89 yield expr[start:]
90
91 @classmethod
92 def _separate_at_paren(cls, expr, delim):
93 separated = list(cls._separate(expr, delim, 1))
94 if len(separated) < 2:
95 raise ExtractorError(f'No terminating paren {delim} in {expr}')
96 return separated[0][1:].strip(), separated[1].strip()
97
98 def interpret_statement(self, stmt, local_vars, allow_recursion=100):
99 if allow_recursion < 0:
100 raise ExtractorError('Recursion limit reached')
101
102 should_abort = False
103 sub_statements = list(self._separate(stmt, ';')) or ['']
104 stmt = sub_statements.pop().lstrip()
105
106 for sub_stmt in sub_statements:
107 ret, should_abort = self.interpret_statement(sub_stmt, local_vars, allow_recursion - 1)
108 if should_abort:
109 return ret, should_abort
110
111 m = re.match(r'(?P<var>var\s)|return(?:\s+|$)', stmt)
112 if not m: # Try interpreting it as an expression
113 expr = stmt
114 elif m.group('var'):
115 expr = stmt[len(m.group(0)):]
116 else:
117 expr = stmt[len(m.group(0)):]
118 should_abort = True
119
120 return self.interpret_expression(expr, local_vars, allow_recursion), should_abort
121
122 def interpret_expression(self, expr, local_vars, allow_recursion):
123 expr = expr.strip()
124 if not expr:
125 return None
126
127 if expr.startswith('{'):
128 inner, outer = self._separate_at_paren(expr, '}')
129 inner, should_abort = self.interpret_statement(inner, local_vars, allow_recursion - 1)
130 if not outer or should_abort:
131 return inner
132 else:
133 expr = json.dumps(inner) + outer
134
135 if expr.startswith('('):
136 inner, outer = self._separate_at_paren(expr, ')')
137 inner = self.interpret_expression(inner, local_vars, allow_recursion)
138 if not outer:
139 return inner
140 else:
141 expr = json.dumps(inner) + outer
142
143 if expr.startswith('['):
144 inner, outer = self._separate_at_paren(expr, ']')
145 name = self._named_object(local_vars, [
146 self.interpret_expression(item, local_vars, allow_recursion)
147 for item in self._separate(inner)])
148 expr = name + outer
149
150 m = re.match(r'(?P<try>try)\s*|(?:(?P<catch>catch)|(?P<for>for)|(?P<switch>switch))\s*\(', expr)
151 if m and m.group('try'):
152 if expr[m.end()] == '{':
153 try_expr, expr = self._separate_at_paren(expr[m.end():], '}')
154 else:
155 try_expr, expr = expr[m.end() - 1:], ''
156 ret, should_abort = self.interpret_statement(try_expr, local_vars, allow_recursion - 1)
157 if should_abort:
158 return ret
159 return self.interpret_statement(expr, local_vars, allow_recursion - 1)[0]
160
161 elif m and m.group('catch'):
162 # We ignore the catch block
163 _, expr = self._separate_at_paren(expr, '}')
164 return self.interpret_statement(expr, local_vars, allow_recursion - 1)[0]
165
166 elif m and m.group('for'):
167 constructor, remaining = self._separate_at_paren(expr[m.end() - 1:], ')')
168 if remaining.startswith('{'):
169 body, expr = self._separate_at_paren(remaining, '}')
170 else:
171 switch_m = re.match(r'switch\s*\(', remaining) # FIXME
172 if switch_m:
173 switch_val, remaining = self._separate_at_paren(remaining[switch_m.end() - 1:], ')')
174 body, expr = self._separate_at_paren(remaining, '}')
175 body = 'switch(%s){%s}' % (switch_val, body)
176 else:
177 body, expr = remaining, ''
178 start, cndn, increment = self._separate(constructor, ';')
179 if self.interpret_statement(start, local_vars, allow_recursion - 1)[1]:
180 raise ExtractorError(
181 f'Premature return in the initialization of a for loop in {constructor!r}')
182 while True:
183 if not self.interpret_expression(cndn, local_vars, allow_recursion):
184 break
185 try:
186 ret, should_abort = self.interpret_statement(body, local_vars, allow_recursion - 1)
187 if should_abort:
188 return ret
189 except JS_Break:
190 break
191 except JS_Continue:
192 pass
193 if self.interpret_statement(increment, local_vars, allow_recursion - 1)[1]:
194 raise ExtractorError(
195 f'Premature return in the initialization of a for loop in {constructor!r}')
196 return self.interpret_statement(expr, local_vars, allow_recursion - 1)[0]
197
198 elif m and m.group('switch'):
199 switch_val, remaining = self._separate_at_paren(expr[m.end() - 1:], ')')
200 switch_val = self.interpret_expression(switch_val, local_vars, allow_recursion)
201 body, expr = self._separate_at_paren(remaining, '}')
202 items = body.replace('default:', 'case default:').split('case ')[1:]
203 for default in (False, True):
204 matched = False
205 for item in items:
206 case, stmt = (i.strip() for i in self._separate(item, ':', 1))
207 if default:
208 matched = matched or case == 'default'
209 elif not matched:
210 matched = case != 'default' and switch_val == self.interpret_expression(case, local_vars, allow_recursion)
211 if not matched:
212 continue
213 try:
214 ret, should_abort = self.interpret_statement(stmt, local_vars, allow_recursion - 1)
215 if should_abort:
216 return ret
217 except JS_Break:
218 break
219 if matched:
220 break
221 return self.interpret_statement(expr, local_vars, allow_recursion - 1)[0]
222
223 # Comma separated statements
224 sub_expressions = list(self._separate(expr))
225 expr = sub_expressions.pop().strip() if sub_expressions else ''
226 for sub_expr in sub_expressions:
227 self.interpret_expression(sub_expr, local_vars, allow_recursion)
228
229 for m in re.finditer(rf'''(?x)
230 (?P<pre_sign>\+\+|--)(?P<var1>{_NAME_RE})|
231 (?P<var2>{_NAME_RE})(?P<post_sign>\+\+|--)''', expr):
232 var = m.group('var1') or m.group('var2')
233 start, end = m.span()
234 sign = m.group('pre_sign') or m.group('post_sign')
235 ret = local_vars[var]
236 local_vars[var] += 1 if sign[0] == '+' else -1
237 if m.group('pre_sign'):
238 ret = local_vars[var]
239 expr = expr[:start] + json.dumps(ret) + expr[end:]
240
241 if not expr:
242 return None
243
244 m = re.match(fr'''(?x)
245 (?P<assign>
246 (?P<out>{_NAME_RE})(?:\[(?P<index>[^\]]+?)\])?\s*
247 (?P<op>{"|".join(map(re.escape, _OPERATORS))})?
248 =(?P<expr>.*)$
249 )|(?P<return>
250 (?!if|return|true|false|null)(?P<name>{_NAME_RE})$
251 )|(?P<indexing>
252 (?P<in>{_NAME_RE})\[(?P<idx>.+)\]$
253 )|(?P<attribute>
254 (?P<var>{_NAME_RE})(?:\.(?P<member>[^(]+)|\[(?P<member2>[^\]]+)\])\s*
255 )|(?P<function>
256 (?P<fname>{_NAME_RE})\((?P<args>[\w$,]*)\)$
257 )''', expr)
258 if m and m.group('assign'):
259 if not m.group('op'):
260 opfunc = lambda curr, right: right
261 else:
262 opfunc = _OPERATORS[m.group('op')]
263 right_val = self.interpret_expression(m.group('expr'), local_vars, allow_recursion)
264 left_val = local_vars.get(m.group('out'))
265
266 if not m.group('index'):
267 local_vars[m.group('out')] = opfunc(left_val, right_val)
268 return local_vars[m.group('out')]
269 elif left_val is None:
270 raise ExtractorError(f'Cannot index undefined variable: {m.group("out")}')
271
272 idx = self.interpret_expression(m.group('index'), local_vars, allow_recursion)
273 if not isinstance(idx, int):
274 raise ExtractorError(f'List indices must be integers: {idx}')
275 left_val[idx] = opfunc(left_val[idx], right_val)
276 return left_val[idx]
277
278 elif expr.isdigit():
279 return int(expr)
280
281 elif expr == 'break':
282 raise JS_Break()
283 elif expr == 'continue':
284 raise JS_Continue()
285
286 elif m and m.group('return'):
287 return local_vars[m.group('name')]
288
289 with contextlib.suppress(ValueError):
290 return json.loads(expr)
291
292 if m and m.group('indexing'):
293 val = local_vars[m.group('in')]
294 idx = self.interpret_expression(m.group('idx'), local_vars, allow_recursion)
295 return val[idx]
296
297 for op, opfunc in _OPERATORS.items():
298 separated = list(self._separate(expr, op))
299 if len(separated) < 2:
300 continue
301 right_val = separated.pop()
302 left_val = op.join(separated)
303 left_val, should_abort = self.interpret_statement(
304 left_val, local_vars, allow_recursion - 1)
305 if should_abort:
306 raise ExtractorError(f'Premature left-side return of {op} in {expr!r}')
307 right_val, should_abort = self.interpret_statement(
308 right_val, local_vars, allow_recursion - 1)
309 if should_abort:
310 raise ExtractorError(f'Premature right-side return of {op} in {expr!r}')
311 return opfunc(left_val or 0, right_val)
312
313 if m and m.group('attribute'):
314 variable = m.group('var')
315 member = remove_quotes(m.group('member') or m.group('member2'))
316 arg_str = expr[m.end():]
317 if arg_str.startswith('('):
318 arg_str, remaining = self._separate_at_paren(arg_str, ')')
319 else:
320 arg_str, remaining = None, arg_str
321
322 def assertion(cndn, msg):
323 """ assert, but without risk of getting optimized out """
324 if not cndn:
325 raise ExtractorError(f'{member} {msg}: {expr}')
326
327 def eval_method():
328 if variable == 'String':
329 obj = str
330 elif variable in local_vars:
331 obj = local_vars[variable]
332 else:
333 if variable not in self._objects:
334 self._objects[variable] = self.extract_object(variable)
335 obj = self._objects[variable]
336
337 # Member access
338 if arg_str is None:
339 if member == 'length':
340 return len(obj)
341 return obj[member]
342
343 # Function call
344 argvals = [
345 self.interpret_expression(v, local_vars, allow_recursion)
346 for v in self._separate(arg_str)]
347
348 if obj == str:
349 if member == 'fromCharCode':
350 assertion(argvals, 'takes one or more arguments')
351 return ''.join(map(chr, argvals))
352 raise ExtractorError(f'Unsupported string method {member}')
353
354 if member == 'split':
355 assertion(argvals, 'takes one or more arguments')
356 assertion(argvals == [''], 'with arguments is not implemented')
357 return list(obj)
358 elif member == 'join':
359 assertion(isinstance(obj, list), 'must be applied on a list')
360 assertion(len(argvals) == 1, 'takes exactly one argument')
361 return argvals[0].join(obj)
362 elif member == 'reverse':
363 assertion(not argvals, 'does not take any arguments')
364 obj.reverse()
365 return obj
366 elif member == 'slice':
367 assertion(isinstance(obj, list), 'must be applied on a list')
368 assertion(len(argvals) == 1, 'takes exactly one argument')
369 return obj[argvals[0]:]
370 elif member == 'splice':
371 assertion(isinstance(obj, list), 'must be applied on a list')
372 assertion(argvals, 'takes one or more arguments')
373 index, howMany = map(int, (argvals + [len(obj)])[:2])
374 if index < 0:
375 index += len(obj)
376 add_items = argvals[2:]
377 res = []
378 for i in range(index, min(index + howMany, len(obj))):
379 res.append(obj.pop(index))
380 for i, item in enumerate(add_items):
381 obj.insert(index + i, item)
382 return res
383 elif member == 'unshift':
384 assertion(isinstance(obj, list), 'must be applied on a list')
385 assertion(argvals, 'takes one or more arguments')
386 for item in reversed(argvals):
387 obj.insert(0, item)
388 return obj
389 elif member == 'pop':
390 assertion(isinstance(obj, list), 'must be applied on a list')
391 assertion(not argvals, 'does not take any arguments')
392 if not obj:
393 return
394 return obj.pop()
395 elif member == 'push':
396 assertion(argvals, 'takes one or more arguments')
397 obj.extend(argvals)
398 return obj
399 elif member == 'forEach':
400 assertion(argvals, 'takes one or more arguments')
401 assertion(len(argvals) <= 2, 'takes at-most 2 arguments')
402 f, this = (argvals + [''])[:2]
403 return [f((item, idx, obj), this=this) for idx, item in enumerate(obj)]
404 elif member == 'indexOf':
405 assertion(argvals, 'takes one or more arguments')
406 assertion(len(argvals) <= 2, 'takes at-most 2 arguments')
407 idx, start = (argvals + [0])[:2]
408 try:
409 return obj.index(idx, start)
410 except ValueError:
411 return -1
412
413 return obj[int(member) if isinstance(obj, list) else member](argvals)
414
415 if remaining:
416 return self.interpret_expression(
417 self._named_object(local_vars, eval_method()) + remaining,
418 local_vars, allow_recursion)
419 else:
420 return eval_method()
421
422 elif m and m.group('function'):
423 fname = m.group('fname')
424 argvals = tuple(
425 int(v) if v.isdigit() else local_vars[v]
426 for v in self._separate(m.group('args')))
427 if fname in local_vars:
428 return local_vars[fname](argvals)
429 elif fname not in self._functions:
430 self._functions[fname] = self.extract_function(fname)
431 return self._functions[fname](argvals)
432
433 raise ExtractorError(f'Unsupported JS expression {expr!r}')
434
435 def extract_object(self, objname):
436 _FUNC_NAME_RE = r'''(?:[a-zA-Z$0-9]+|"[a-zA-Z$0-9]+"|'[a-zA-Z$0-9]+')'''
437 obj = {}
438 obj_m = re.search(
439 r'''(?x)
440 (?<!this\.)%s\s*=\s*{\s*
441 (?P<fields>(%s\s*:\s*function\s*\(.*?\)\s*{.*?}(?:,\s*)?)*)
442 }\s*;
443 ''' % (re.escape(objname), _FUNC_NAME_RE),
444 self.code)
445 fields = obj_m.group('fields')
446 # Currently, it only supports function definitions
447 fields_m = re.finditer(
448 r'''(?x)
449 (?P<key>%s)\s*:\s*function\s*\((?P<args>[a-z,]+)\){(?P<code>[^}]+)}
450 ''' % _FUNC_NAME_RE,
451 fields)
452 for f in fields_m:
453 argnames = f.group('args').split(',')
454 obj[remove_quotes(f.group('key'))] = self.build_function(argnames, f.group('code'))
455
456 return obj
457
458 def extract_function_code(self, funcname):
459 """ @returns argnames, code """
460 func_m = re.search(
461 r'''(?x)
462 (?:
463 function\s+%(name)s|
464 [{;,]\s*%(name)s\s*=\s*function|
465 var\s+%(name)s\s*=\s*function
466 )\s*
467 \((?P<args>[^)]*)\)\s*
468 (?P<code>{(?:(?!};)[^"]|"([^"]|\\")*")+})''' % {'name': re.escape(funcname)},
469 self.code)
470 code, _ = self._separate_at_paren(func_m.group('code'), '}') # refine the match
471 if func_m is None:
472 raise ExtractorError(f'Could not find JS function "{funcname}"')
473 return func_m.group('args').split(','), code
474
475 def extract_function(self, funcname):
476 return self.extract_function_from_code(*self.extract_function_code(funcname))
477
478 def extract_function_from_code(self, argnames, code, *global_stack):
479 local_vars = {}
480 while True:
481 mobj = re.search(r'function\((?P<args>[^)]*)\)\s*{', code)
482 if mobj is None:
483 break
484 start, body_start = mobj.span()
485 body, remaining = self._separate_at_paren(code[body_start - 1:], '}')
486 name = self._named_object(local_vars, self.extract_function_from_code(
487 [x.strip() for x in mobj.group('args').split(',')],
488 body, local_vars, *global_stack))
489 code = code[:start] + name + remaining
490 return self.build_function(argnames, code, local_vars, *global_stack)
491
492 def call_function(self, funcname, *args):
493 return self.extract_function(funcname)(args)
494
495 def build_function(self, argnames, code, *global_stack):
496 global_stack = list(global_stack) or [{}]
497
498 def resf(args, **kwargs):
499 global_stack[0].update({
500 **dict(zip(argnames, args)),
501 **kwargs
502 })
503 var_stack = LocalNameSpace(*global_stack)
504 for stmt in self._separate(code.replace('\n', ''), ';'):
505 ret, should_abort = self.interpret_statement(stmt, var_stack)
506 if should_abort:
507 break
508 return ret
509 return resf