19 _NAME_RE
= r
'[a-zA-Z_$][\w$]*'
20 _OPERATORS
= { # None => Defined in JSInterpreter._operator
29 # FIXME: This should actually be below comparision
30 '>>': operator
.rshift
,
31 '<<': operator
.lshift
,
42 '/': operator
.truediv
,
46 _MATCHING_PARENS
= dict(zip('({[', ')}]'))
50 def _ternary(cndn
, if_true
=True, if_false
=False):
51 """Simulate JS's ternary operator (cndn?if_true:if_false)"""
52 if cndn
in (False, None, 0, ''):
54 with contextlib
.suppress(TypeError):
55 if math
.isnan(cndn
): # NB: NaN cannot be checked by membership
60 class JS_Break(ExtractorError
):
62 ExtractorError
.__init
__(self
, 'Invalid break')
65 class JS_Continue(ExtractorError
):
67 ExtractorError
.__init
__(self
, 'Invalid continue')
70 class LocalNameSpace(collections
.ChainMap
):
71 def __setitem__(self
, key
, value
):
72 for scope
in self
.maps
:
76 self
.maps
[0][key
] = value
78 def __delitem__(self
, key
):
79 raise NotImplementedError('Deleting is not supported')
84 ENABLED
= 'pytest' in sys
.modules
87 def write(*args
, level
=100):
88 write_string(f
'[debug] JS: {" " * (100 - level)}'
89 f
'{" ".join(truncate_string(str(x), 50, 50) for x in args)}\n')
92 def wrap_interpreter(cls
, f
):
93 def interpret_statement(self
, stmt
, local_vars
, allow_recursion
, *args
, **kwargs
):
94 if cls
.ENABLED
and stmt
.strip():
95 cls
.write(stmt
, level
=allow_recursion
)
96 ret
, should_ret
= f(self
, stmt
, local_vars
, allow_recursion
, *args
, **kwargs
)
97 if cls
.ENABLED
and stmt
.strip():
98 cls
.write(['->', '=>'][should_ret
], repr(ret
), '<-|', stmt
, level
=allow_recursion
)
99 return ret
, should_ret
100 return interpret_statement
104 __named_object_counter
= 0
106 def __init__(self
, code
, objects
=None):
107 self
.code
, self
._functions
= code
, {}
108 self
._objects
= {} if objects
is None else objects
110 class Exception(ExtractorError
):
111 def __init__(self
, msg
, expr
=None, *args
, **kwargs
):
113 msg
= f
'{msg.rstrip()} in: {truncate_string(expr, 50, 50)}'
114 super().__init
__(msg
, *args
, **kwargs
)
116 def _named_object(self
, namespace
, obj
):
117 self
.__named
_object
_counter
+= 1
118 name
= f
'__yt_dlp_jsinterp_obj{self.__named_object_counter}'
119 namespace
[name
] = obj
123 def _separate(expr
, delim
=',', max_split
=None):
126 counters
= {k: 0 for k in _MATCHING_PARENS.values()}
127 start
, splits
, pos
, delim_len
= 0, 0, 0, len(delim
) - 1
128 in_quote
, escaping
= None, False
129 for idx
, char
in enumerate(expr
):
130 if not in_quote
and char
in _MATCHING_PARENS
:
131 counters
[_MATCHING_PARENS
[char
]] += 1
132 elif not in_quote
and char
in counters
:
134 elif not escaping
and char
in _QUOTES
and in_quote
in (char
, None):
135 in_quote
= None if in_quote
else char
136 escaping
= not escaping
and in_quote
and char
== '\\'
138 if char
!= delim
[pos
] or any(counters
.values()) or in_quote
:
141 elif pos
!= delim_len
:
144 yield expr
[start
: idx
- delim_len
]
145 start
, pos
= idx
+ 1, 0
147 if max_split
and splits
>= max_split
:
152 def _separate_at_paren(cls
, expr
, delim
):
153 separated
= list(cls
._separate
(expr
, delim
, 1))
154 if len(separated
) < 2:
155 raise cls
.Exception(f
'No terminating paren {delim}', expr
)
156 return separated
[0][1:].strip(), separated
[1].strip()
158 def _operator(self
, op
, left_val
, right_expr
, expr
, local_vars
, allow_recursion
):
159 if op
in ('||', '&&'):
160 if (op
== '&&') ^
_ternary(left_val
):
161 return left_val
# short circuiting
163 right_expr
= _ternary(left_val
, *self
._separate
(right_expr
, ':', 1))
165 right_val
= self
.interpret_expression(right_expr
, local_vars
, allow_recursion
)
166 if not _OPERATORS
.get(op
):
170 return _OPERATORS
[op
](left_val
, right_val
)
171 except Exception as e
:
172 raise self
.Exception(f
'Failed to evaluate {left_val!r} {op} {right_val!r}', expr
, cause
=e
)
174 def _index(self
, obj
, idx
):
178 return obj
[int(idx
)] if isinstance(obj
, list) else obj
[idx
]
179 except Exception as e
:
180 raise self
.Exception(f
'Cannot get index {idx}', repr(obj
), cause
=e
)
182 def _dump(self
, obj
, namespace
):
184 return json
.dumps(obj
)
186 return self
._named
_object
(namespace
, obj
)
188 @Debugger.wrap_interpreter
189 def interpret_statement(self
, stmt
, local_vars
, allow_recursion
=100):
190 if allow_recursion
< 0:
191 raise self
.Exception('Recursion limit reached')
194 should_return
= False
195 sub_statements
= list(self
._separate
(stmt
, ';')) or ['']
196 expr
= stmt
= sub_statements
.pop().strip()
198 for sub_stmt
in sub_statements
:
199 ret
, should_return
= self
.interpret_statement(sub_stmt
, local_vars
, allow_recursion
)
201 return ret
, should_return
203 m
= re
.match(r
'(?P<var>var\s)|return(?:\s+|$)', stmt
)
205 expr
= stmt
[len(m
.group(0)):].strip()
206 should_return
= not m
.group('var')
208 return None, should_return
210 if expr
[0] in _QUOTES
:
211 inner
, outer
= self
._separate
(expr
, expr
[0], 1)
212 inner
= json
.loads(js_to_json(f
'{inner}{expr[0]}', strict
=True))
214 return inner
, should_return
215 expr
= self
._named
_object
(local_vars
, inner
) + outer
217 if expr
.startswith('new '):
219 if obj
.startswith('Date('):
220 left
, right
= self
._separate
_at
_paren
(obj
[4:], ')')
221 expr
= unified_timestamp(left
[1:-1], False)
223 raise self
.Exception(f
'Failed to parse date {left!r}', expr
)
224 expr
= self
._dump
(int(expr
* 1000), local_vars
) + right
226 raise self
.Exception(f
'Unsupported object {obj}', expr
)
228 if expr
.startswith('{'):
229 inner
, outer
= self
._separate
_at
_paren
(expr
, '}')
230 inner
, should_abort
= self
.interpret_statement(inner
, local_vars
, allow_recursion
)
231 if not outer
or should_abort
:
232 return inner
, should_abort
or should_return
234 expr
= self
._dump
(inner
, local_vars
) + outer
236 if expr
.startswith('('):
237 inner
, outer
= self
._separate
_at
_paren
(expr
, ')')
238 inner
, should_abort
= self
.interpret_statement(inner
, local_vars
, allow_recursion
)
239 if not outer
or should_abort
:
240 return inner
, should_abort
or should_return
242 expr
= self
._dump
(inner
, local_vars
) + outer
244 if expr
.startswith('['):
245 inner
, outer
= self
._separate
_at
_paren
(expr
, ']')
246 name
= self
._named
_object
(local_vars
, [
247 self
.interpret_expression(item
, local_vars
, allow_recursion
)
248 for item
in self
._separate
(inner
)])
251 m
= re
.match(r
'(?P<try>try|finally)\s*|(?:(?P<catch>catch)|(?P<for>for)|(?P<switch>switch))\s*\(', expr
)
252 if m
and m
.group('try'):
253 if expr
[m
.end()] == '{':
254 try_expr
, expr
= self
._separate
_at
_paren
(expr
[m
.end():], '}')
256 try_expr
, expr
= expr
[m
.end() - 1:], ''
257 ret
, should_abort
= self
.interpret_statement(try_expr
, local_vars
, allow_recursion
)
260 ret
, should_abort
= self
.interpret_statement(expr
, local_vars
, allow_recursion
)
261 return ret
, should_abort
or should_return
263 elif m
and m
.group('catch'):
264 # We ignore the catch block
265 _
, expr
= self
._separate
_at
_paren
(expr
, '}')
266 ret
, should_abort
= self
.interpret_statement(expr
, local_vars
, allow_recursion
)
267 return ret
, should_abort
or should_return
269 elif m
and m
.group('for'):
270 constructor
, remaining
= self
._separate
_at
_paren
(expr
[m
.end() - 1:], ')')
271 if remaining
.startswith('{'):
272 body
, expr
= self
._separate
_at
_paren
(remaining
, '}')
274 switch_m
= re
.match(r
'switch\s*\(', remaining
) # FIXME
276 switch_val
, remaining
= self
._separate
_at
_paren
(remaining
[switch_m
.end() - 1:], ')')
277 body
, expr
= self
._separate
_at
_paren
(remaining
, '}')
278 body
= 'switch(%s){%s}' % (switch_val
, body
)
280 body
, expr
= remaining
, ''
281 start
, cndn
, increment
= self
._separate
(constructor
, ';')
282 self
.interpret_expression(start
, local_vars
, allow_recursion
)
284 if not _ternary(self
.interpret_expression(cndn
, local_vars
, allow_recursion
)):
287 ret
, should_abort
= self
.interpret_statement(body
, local_vars
, allow_recursion
)
294 self
.interpret_expression(increment
, local_vars
, allow_recursion
)
295 ret
, should_abort
= self
.interpret_statement(expr
, local_vars
, allow_recursion
)
296 return ret
, should_abort
or should_return
298 elif m
and m
.group('switch'):
299 switch_val
, remaining
= self
._separate
_at
_paren
(expr
[m
.end() - 1:], ')')
300 switch_val
= self
.interpret_expression(switch_val
, local_vars
, allow_recursion
)
301 body
, expr
= self
._separate
_at
_paren
(remaining
, '}')
302 items
= body
.replace('default:', 'case default:').split('case ')[1:]
303 for default
in (False, True):
306 case
, stmt
= (i
.strip() for i
in self
._separate
(item
, ':', 1))
308 matched
= matched
or case
== 'default'
310 matched
= case
!= 'default' and switch_val
== self
.interpret_expression(case
, local_vars
, allow_recursion
)
314 ret
, should_abort
= self
.interpret_statement(stmt
, local_vars
, allow_recursion
)
321 ret
, should_abort
= self
.interpret_statement(expr
, local_vars
, allow_recursion
)
322 return ret
, should_abort
or should_return
324 # Comma separated statements
325 sub_expressions
= list(self
._separate
(expr
))
326 expr
= sub_expressions
.pop().strip() if sub_expressions
else ''
327 for sub_expr
in sub_expressions
:
328 ret
, should_abort
= self
.interpret_statement(sub_expr
, local_vars
, allow_recursion
)
332 for m
in re
.finditer(rf
'''(?x)
333 (?P<pre_sign>\+\+|--)(?P<var1>{_NAME_RE})|
334 (?P<var2>{_NAME_RE})(?P<post_sign>\+\+|--)''', expr
):
335 var
= m
.group('var1') or m
.group('var2')
336 start
, end
= m
.span()
337 sign
= m
.group('pre_sign') or m
.group('post_sign')
338 ret
= local_vars
[var
]
339 local_vars
[var
] += 1 if sign
[0] == '+' else -1
340 if m
.group('pre_sign'):
341 ret
= local_vars
[var
]
342 expr
= expr
[:start
] + self
._dump
(ret
, local_vars
) + expr
[end
:]
345 return None, should_return
347 m
= re
.match(fr
'''(?x)
349 (?P<out>{_NAME_RE})(?:\[(?P<index>[^\]]+?)\])?\s*
350 (?P<op>{"|".join(map(re.escape, _OPERATORS))})?
353 (?!if|return|true|false|null|undefined)(?P<name>{_NAME_RE})$
355 (?P<in>{_NAME_RE})\[(?P<idx>.+)\]$
357 (?P<var>{_NAME_RE})(?:\.(?P<member>[^(]+)|\[(?P<member2>[^\]]+)\])\s*
359 (?P<fname>{_NAME_RE})\((?P<args>.*)\)$
361 if m
and m
.group('assign'):
362 left_val
= local_vars
.get(m
.group('out'))
364 if not m
.group('index'):
365 local_vars
[m
.group('out')] = self
._operator
(
366 m
.group('op'), left_val
, m
.group('expr'), expr
, local_vars
, allow_recursion
)
367 return local_vars
[m
.group('out')], should_return
368 elif left_val
is None:
369 raise self
.Exception(f
'Cannot index undefined variable {m.group("out")}', expr
)
371 idx
= self
.interpret_expression(m
.group('index'), local_vars
, allow_recursion
)
372 if not isinstance(idx
, (int, float)):
373 raise self
.Exception(f
'List index {idx} must be integer', expr
)
375 left_val
[idx
] = self
._operator
(
376 m
.group('op'), left_val
[idx
], m
.group('expr'), expr
, local_vars
, allow_recursion
)
377 return left_val
[idx
], should_return
380 return int(expr
), should_return
382 elif expr
== 'break':
384 elif expr
== 'continue':
387 elif m
and m
.group('return'):
388 return local_vars
[m
.group('name')], should_return
390 with contextlib
.suppress(ValueError):
391 return json
.loads(js_to_json(expr
, strict
=True)), should_return
393 if m
and m
.group('indexing'):
394 val
= local_vars
[m
.group('in')]
395 idx
= self
.interpret_expression(m
.group('idx'), local_vars
, allow_recursion
)
396 return self
._index
(val
, idx
), should_return
398 for op
in _OPERATORS
:
399 separated
= list(self
._separate
(expr
, op
))
400 if len(separated
) < 2:
402 right_expr
= separated
.pop()
403 while op
== '-' and len(separated
) > 1 and not separated
[-1].strip():
404 right_expr
= f
'-{right_expr}'
406 left_val
= self
.interpret_expression(op
.join(separated
), local_vars
, allow_recursion
)
407 return self
._operator
(op
, 0 if left_val
is None else left_val
,
408 right_expr
, expr
, local_vars
, allow_recursion
), should_return
410 if m
and m
.group('attribute'):
411 variable
= m
.group('var')
412 member
= m
.group('member')
414 member
= self
.interpret_expression(m
.group('member2'), local_vars
, allow_recursion
)
415 arg_str
= expr
[m
.end():]
416 if arg_str
.startswith('('):
417 arg_str
, remaining
= self
._separate
_at
_paren
(arg_str
, ')')
419 arg_str
, remaining
= None, arg_str
421 def assertion(cndn
, msg
):
422 """ assert, but without risk of getting optimized out """
424 raise self
.Exception(f
'{member} {msg}', expr
)
427 if (variable
, member
) == ('console', 'debug'):
429 Debugger
.write(self
.interpret_expression(f
'[{arg_str}]', local_vars
, allow_recursion
))
436 obj
= local_vars
.get(variable
, types
.get(variable
, NO_DEFAULT
))
437 if obj
is NO_DEFAULT
:
438 if variable
not in self
._objects
:
439 self
._objects
[variable
] = self
.extract_object(variable
)
440 obj
= self
._objects
[variable
]
444 return self
._index
(obj
, member
)
448 self
.interpret_expression(v
, local_vars
, allow_recursion
)
449 for v
in self
._separate
(arg_str
)]
452 if member
== 'fromCharCode':
453 assertion(argvals
, 'takes one or more arguments')
454 return ''.join(map(chr, argvals
))
455 raise self
.Exception(f
'Unsupported String method {member}', expr
)
458 assertion(len(argvals
) == 2, 'takes two arguments')
459 return argvals
[0] ** argvals
[1]
460 raise self
.Exception(f
'Unsupported Math method {member}', expr
)
462 if member
== 'split':
463 assertion(argvals
, 'takes one or more arguments')
464 assertion(len(argvals
) == 1, 'with limit argument is not implemented')
465 return obj
.split(argvals
[0]) if argvals
[0] else list(obj
)
466 elif member
== 'join':
467 assertion(isinstance(obj
, list), 'must be applied on a list')
468 assertion(len(argvals
) == 1, 'takes exactly one argument')
469 return argvals
[0].join(obj
)
470 elif member
== 'reverse':
471 assertion(not argvals
, 'does not take any arguments')
474 elif member
== 'slice':
475 assertion(isinstance(obj
, list), 'must be applied on a list')
476 assertion(len(argvals
) == 1, 'takes exactly one argument')
477 return obj
[argvals
[0]:]
478 elif member
== 'splice':
479 assertion(isinstance(obj
, list), 'must be applied on a list')
480 assertion(argvals
, 'takes one or more arguments')
481 index
, howMany
= map(int, (argvals
+ [len(obj
)])[:2])
484 add_items
= argvals
[2:]
486 for i
in range(index
, min(index
+ howMany
, len(obj
))):
487 res
.append(obj
.pop(index
))
488 for i
, item
in enumerate(add_items
):
489 obj
.insert(index
+ i
, item
)
491 elif member
== 'unshift':
492 assertion(isinstance(obj
, list), 'must be applied on a list')
493 assertion(argvals
, 'takes one or more arguments')
494 for item
in reversed(argvals
):
497 elif member
== 'pop':
498 assertion(isinstance(obj
, list), 'must be applied on a list')
499 assertion(not argvals
, 'does not take any arguments')
503 elif member
== 'push':
504 assertion(argvals
, 'takes one or more arguments')
507 elif member
== 'forEach':
508 assertion(argvals
, 'takes one or more arguments')
509 assertion(len(argvals
) <= 2, 'takes at-most 2 arguments')
510 f
, this
= (argvals
+ [''])[:2]
511 return [f((item
, idx
, obj
), {'this': this}
, allow_recursion
) for idx
, item
in enumerate(obj
)]
512 elif member
== 'indexOf':
513 assertion(argvals
, 'takes one or more arguments')
514 assertion(len(argvals
) <= 2, 'takes at-most 2 arguments')
515 idx
, start
= (argvals
+ [0])[:2]
517 return obj
.index(idx
, start
)
521 idx
= int(member
) if isinstance(obj
, list) else member
522 return obj
[idx
](argvals
, allow_recursion
=allow_recursion
)
525 ret
, should_abort
= self
.interpret_statement(
526 self
._named
_object
(local_vars
, eval_method()) + remaining
,
527 local_vars
, allow_recursion
)
528 return ret
, should_return
or should_abort
530 return eval_method(), should_return
532 elif m
and m
.group('function'):
533 fname
= m
.group('fname')
534 argvals
= [self
.interpret_expression(v
, local_vars
, allow_recursion
)
535 for v
in self
._separate
(m
.group('args'))]
536 if fname
in local_vars
:
537 return local_vars
[fname
](argvals
, allow_recursion
=allow_recursion
), should_return
538 elif fname
not in self
._functions
:
539 self
._functions
[fname
] = self
.extract_function(fname
)
540 return self
._functions
[fname
](argvals
, allow_recursion
=allow_recursion
), should_return
542 raise self
.Exception(
543 f
'Unsupported JS expression {truncate_string(expr, 20, 20) if expr != stmt else ""}', stmt
)
545 def interpret_expression(self
, expr
, local_vars
, allow_recursion
):
546 ret
, should_return
= self
.interpret_statement(expr
, local_vars
, allow_recursion
)
548 raise self
.Exception('Cannot return from an expression', expr
)
551 def extract_object(self
, objname
):
552 _FUNC_NAME_RE
= r
'''(?:[a-zA-Z$0-9]+|"[a-zA-Z$0-9]+"|'[a-zA-Z$0-9]+')'''
556 (?<!this\.)%s\s*=\s*{\s*
557 (?P<fields>(%s\s*:\s*function\s*\(.*?\)\s*{.*?}(?:,\s*)?)*)
559 ''' % (re
.escape(objname
), _FUNC_NAME_RE
),
562 raise self
.Exception(f
'Could not find object {objname}')
563 fields
= obj_m
.group('fields')
564 # Currently, it only supports function definitions
565 fields_m
= re
.finditer(
567 (?P<key>%s)\s*:\s*function\s*\((?P<args>[a-z,]+)\){(?P<code>[^}]+)}
571 argnames
= f
.group('args').split(',')
572 obj
[remove_quotes(f
.group('key'))] = self
.build_function(argnames
, f
.group('code'))
576 def extract_function_code(self
, funcname
):
577 """ @returns argnames, code """
582 [{;,]\s*%(name)s\s*=\s*function|
583 var\s+%(name)s\s*=\s*function
585 \((?P<args>[^)]*)\)\s*
586 (?P<code>{.+})''' % {'name': re.escape(funcname)}
,
588 code
, _
= self
._separate
_at
_paren
(func_m
.group('code'), '}')
590 raise self
.Exception(f
'Could not find JS function "{funcname}"')
591 return [x
.strip() for x
in func_m
.group('args').split(',')], code
593 def extract_function(self
, funcname
):
594 return self
.extract_function_from_code(*self
.extract_function_code(funcname
))
596 def extract_function_from_code(self
, argnames
, code
, *global_stack
):
599 mobj
= re
.search(r
'function\((?P<args>[^)]*)\)\s*{', code
)
602 start
, body_start
= mobj
.span()
603 body
, remaining
= self
._separate
_at
_paren
(code
[body_start
- 1:], '}')
604 name
= self
._named
_object
(local_vars
, self
.extract_function_from_code(
605 [x
.strip() for x
in mobj
.group('args').split(',')],
606 body
, local_vars
, *global_stack
))
607 code
= code
[:start
] + name
+ remaining
608 return self
.build_function(argnames
, code
, local_vars
, *global_stack
)
610 def call_function(self
, funcname
, *args
):
611 return self
.extract_function(funcname
)(args
)
613 def build_function(self
, argnames
, code
, *global_stack
):
614 global_stack
= list(global_stack
) or [{}]
615 argnames
= tuple(argnames
)
617 def resf(args
, kwargs
={}, allow_recursion
=100):
618 global_stack
[0].update({
619 **dict(itertools
.zip_longest(argnames
, args
, fillvalue
=None)),
622 var_stack
= LocalNameSpace(*global_stack
)
623 ret
, should_abort
= self
.interpret_statement(code
.replace('\n', ''), var_stack
, allow_recursion
- 1)