4 from collections
.abc
import MutableMapping
6 from .utils
import ExtractorError
, remove_quotes
12 ('>>', operator
.rshift
),
13 ('<<', operator
.lshift
),
17 ('/', operator
.truediv
),
20 _ASSIGN_OPERATORS
= [(op
+ '=', opfunc
) for op
, opfunc
in _OPERATORS
]
21 _ASSIGN_OPERATORS
.append(('=', (lambda cur
, right
: right
)))
23 _NAME_RE
= r
'[a-zA-Z_$][a-zA-Z_$0-9]*'
25 _MATCHING_PARENS
= dict(zip('({[', ')}]'))
28 class JS_Break(ExtractorError
):
30 ExtractorError
.__init
__(self
, 'Invalid break')
33 class JS_Continue(ExtractorError
):
35 ExtractorError
.__init
__(self
, 'Invalid continue')
38 class LocalNameSpace(MutableMapping
):
39 def __init__(self
, *stack
):
40 self
.stack
= tuple(stack
)
42 def __getitem__(self
, key
):
43 for scope
in self
.stack
:
48 def __setitem__(self
, key
, value
):
49 for scope
in self
.stack
:
54 self
.stack
[0][key
] = value
57 def __delitem__(self
, key
):
58 raise NotImplementedError('Deleting is not supported')
61 for scope
in self
.stack
:
64 def __len__(self
, key
):
65 return len(iter(self
))
68 return f
'LocalNameSpace{self.stack}'
72 def __init__(self
, code
, objects
=None):
77 self
._objects
= objects
78 self
.__named
_object
_counter
= 0
80 def _named_object(self
, namespace
, obj
):
81 self
.__named
_object
_counter
+= 1
82 name
= f
'__yt_dlp_jsinterp_obj{self.__named_object_counter}'
87 def _separate(expr
, delim
=',', max_split
=None):
90 counters
= {k: 0 for k in _MATCHING_PARENS.values()}
91 start
, splits
, pos
, delim_len
= 0, 0, 0, len(delim
) - 1
92 for idx
, char
in enumerate(expr
):
93 if char
in _MATCHING_PARENS
:
94 counters
[_MATCHING_PARENS
[char
]] += 1
95 elif char
in counters
:
97 if char
!= delim
[pos
] or any(counters
.values()):
100 elif pos
!= delim_len
:
103 yield expr
[start
: idx
- delim_len
]
104 start
, pos
= idx
+ 1, 0
106 if max_split
and splits
>= max_split
:
111 def _separate_at_paren(expr
, delim
):
112 separated
= list(JSInterpreter
._separate
(expr
, delim
, 1))
113 if len(separated
) < 2:
114 raise ExtractorError(f
'No terminating paren {delim} in {expr}')
115 return separated
[0][1:].strip(), separated
[1].strip()
117 def interpret_statement(self
, stmt
, local_vars
, allow_recursion
=100):
118 if allow_recursion
< 0:
119 raise ExtractorError('Recursion limit reached')
121 sub_statements
= list(self
._separate
(stmt
, ';'))
122 stmt
= (sub_statements
or ['']).pop()
123 for sub_stmt
in sub_statements
:
124 ret
, should_abort
= self
.interpret_statement(sub_stmt
, local_vars
, allow_recursion
- 1)
130 stmt_m
= re
.match(r
'var\s', stmt
)
132 expr
= stmt
[len(stmt_m
.group(0)):]
134 return_m
= re
.match(r
'return(?:\s+|$)', stmt
)
136 expr
= stmt
[len(return_m
.group(0)):]
139 # Try interpreting it as an expression
142 v
= self
.interpret_expression(expr
, local_vars
, allow_recursion
)
143 return v
, should_abort
145 def interpret_expression(self
, expr
, local_vars
, allow_recursion
):
147 if expr
== '': # Empty expression
150 if expr
.startswith('{'):
151 inner
, outer
= self
._separate
_at
_paren
(expr
, '}')
152 inner
, should_abort
= self
.interpret_statement(inner
, local_vars
, allow_recursion
- 1)
153 if not outer
or should_abort
:
156 expr
= json
.dumps(inner
) + outer
158 if expr
.startswith('('):
159 inner
, outer
= self
._separate
_at
_paren
(expr
, ')')
160 inner
= self
.interpret_expression(inner
, local_vars
, allow_recursion
)
164 expr
= json
.dumps(inner
) + outer
166 if expr
.startswith('['):
167 inner
, outer
= self
._separate
_at
_paren
(expr
, ']')
168 name
= self
._named
_object
(local_vars
, [
169 self
.interpret_expression(item
, local_vars
, allow_recursion
)
170 for item
in self
._separate
(inner
)])
173 m
= re
.match(r
'try\s*', expr
)
175 if expr
[m
.end()] == '{':
176 try_expr
, expr
= self
._separate
_at
_paren
(expr
[m
.end():], '}')
178 try_expr
, expr
= expr
[m
.end() - 1:], ''
179 ret
, should_abort
= self
.interpret_statement(try_expr
, local_vars
, allow_recursion
- 1)
182 return self
.interpret_statement(expr
, local_vars
, allow_recursion
- 1)[0]
184 m
= re
.match(r
'catch\s*\(', expr
)
186 # We ignore the catch block
187 _
, expr
= self
._separate
_at
_paren
(expr
, '}')
188 return self
.interpret_statement(expr
, local_vars
, allow_recursion
- 1)[0]
190 m
= re
.match(r
'for\s*\(', expr
)
192 constructor
, remaining
= self
._separate
_at
_paren
(expr
[m
.end() - 1:], ')')
193 if remaining
.startswith('{'):
194 body
, expr
= self
._separate
_at
_paren
(remaining
, '}')
196 m
= re
.match(r
'switch\s*\(', remaining
) # FIXME
198 switch_val
, remaining
= self
._separate
_at
_paren
(remaining
[m
.end() - 1:], ')')
199 body
, expr
= self
._separate
_at
_paren
(remaining
, '}')
200 body
= 'switch(%s){%s}' % (switch_val
, body
)
202 body
, expr
= remaining
, ''
203 start
, cndn
, increment
= self
._separate
(constructor
, ';')
204 if self
.interpret_statement(start
, local_vars
, allow_recursion
- 1)[1]:
205 raise ExtractorError(
206 f
'Premature return in the initialization of a for loop in {constructor!r}')
208 if not self
.interpret_expression(cndn
, local_vars
, allow_recursion
):
211 ret
, should_abort
= self
.interpret_statement(body
, local_vars
, allow_recursion
- 1)
218 if self
.interpret_statement(increment
, local_vars
, allow_recursion
- 1)[1]:
219 raise ExtractorError(
220 f
'Premature return in the initialization of a for loop in {constructor!r}')
221 return self
.interpret_statement(expr
, local_vars
, allow_recursion
- 1)[0]
223 m
= re
.match(r
'switch\s*\(', expr
)
225 switch_val
, remaining
= self
._separate
_at
_paren
(expr
[m
.end() - 1:], ')')
226 switch_val
= self
.interpret_expression(switch_val
, local_vars
, allow_recursion
)
227 body
, expr
= self
._separate
_at
_paren
(remaining
, '}')
228 items
= body
.replace('default:', 'case default:').split('case ')[1:]
229 for default
in (False, True):
232 case
, stmt
= (i
.strip() for i
in self
._separate
(item
, ':', 1))
234 matched
= matched
or case
== 'default'
236 matched
= case
!= 'default' and switch_val
== self
.interpret_expression(case
, local_vars
, allow_recursion
)
240 ret
, should_abort
= self
.interpret_statement(stmt
, local_vars
, allow_recursion
- 1)
247 return self
.interpret_statement(expr
, local_vars
, allow_recursion
- 1)[0]
249 # Comma separated statements
250 sub_expressions
= list(self
._separate
(expr
))
251 expr
= sub_expressions
.pop().strip() if sub_expressions
else ''
252 for sub_expr
in sub_expressions
:
253 self
.interpret_expression(sub_expr
, local_vars
, allow_recursion
)
255 for m
in re
.finditer(rf
'''(?x)
256 (?P<pre_sign>\+\+|--)(?P<var1>{_NAME_RE})|
257 (?P<var2>{_NAME_RE})(?P<post_sign>\+\+|--)''', expr
):
258 var
= m
.group('var1') or m
.group('var2')
259 start
, end
= m
.span()
260 sign
= m
.group('pre_sign') or m
.group('post_sign')
261 ret
= local_vars
[var
]
262 local_vars
[var
] += 1 if sign
[0] == '+' else -1
263 if m
.group('pre_sign'):
264 ret
= local_vars
[var
]
265 expr
= expr
[:start
] + json
.dumps(ret
) + expr
[end
:]
267 for op
, opfunc
in _ASSIGN_OPERATORS
:
268 m
= re
.match(rf
'''(?x)
269 (?P<out>{_NAME_RE})(?:\[(?P<index>[^\]]+?)\])?
271 (?P<expr>.*)$''', expr
)
274 right_val
= self
.interpret_expression(m
.group('expr'), local_vars
, allow_recursion
)
276 if m
.groupdict().get('index'):
277 lvar
= local_vars
[m
.group('out')]
278 idx
= self
.interpret_expression(m
.group('index'), local_vars
, allow_recursion
)
279 if not isinstance(idx
, int):
280 raise ExtractorError(f
'List indices must be integers: {idx}')
282 val
= opfunc(cur
, right_val
)
286 cur
= local_vars
.get(m
.group('out'))
287 val
= opfunc(cur
, right_val
)
288 local_vars
[m
.group('out')] = val
296 elif expr
== 'continue':
300 r
'(?!if|return|true|false|null)(?P<name>%s)$' % _NAME_RE
,
303 return local_vars
[var_m
.group('name')]
306 return json
.loads(expr
)
311 r
'(?P<in>%s)\[(?P<idx>.+)\]$' % _NAME_RE
, expr
)
313 val
= local_vars
[m
.group('in')]
314 idx
= self
.interpret_expression(m
.group('idx'), local_vars
, allow_recursion
)
317 for op
, opfunc
in _OPERATORS
:
318 separated
= list(self
._separate
(expr
, op
))
319 if len(separated
) < 2:
321 right_val
= separated
.pop()
322 left_val
= op
.join(separated
)
323 left_val
, should_abort
= self
.interpret_statement(
324 left_val
, local_vars
, allow_recursion
- 1)
326 raise ExtractorError(f
'Premature left-side return of {op} in {expr!r}')
327 right_val
, should_abort
= self
.interpret_statement(
328 right_val
, local_vars
, allow_recursion
- 1)
330 raise ExtractorError(f
'Premature right-side return of {op} in {expr!r}')
331 return opfunc(left_val
or 0, right_val
)
334 r
'(?P<var>%s)(?:\.(?P<member>[^(]+)|\[(?P<member2>[^]]+)\])\s*' % _NAME_RE
,
337 variable
= m
.group('var')
338 member
= remove_quotes(m
.group('member') or m
.group('member2'))
339 arg_str
= expr
[m
.end():]
340 if arg_str
.startswith('('):
341 arg_str
, remaining
= self
._separate
_at
_paren
(arg_str
, ')')
343 arg_str
, remaining
= None, arg_str
345 def assertion(cndn
, msg
):
346 """ assert, but without risk of getting optimized out """
348 raise ExtractorError(f
'{member} {msg}: {expr}')
352 if variable
== 'String':
354 elif variable
in local_vars
:
355 obj
= local_vars
[variable
]
357 if variable
not in self
._objects
:
358 self
._objects
[variable
] = self
.extract_object(variable
)
359 obj
= self
._objects
[variable
]
363 if member
== 'length':
369 self
.interpret_expression(v
, local_vars
, allow_recursion
)
370 for v
in self
._separate
(arg_str
)]
373 if member
== 'fromCharCode':
374 assertion(argvals
, 'takes one or more arguments')
375 return ''.join(map(chr, argvals
))
376 raise ExtractorError(f
'Unsupported string method {member}')
378 if member
== 'split':
379 assertion(argvals
, 'takes one or more arguments')
380 assertion(argvals
== [''], 'with arguments is not implemented')
382 elif member
== 'join':
383 assertion(isinstance(obj
, list), 'must be applied on a list')
384 assertion(len(argvals
) == 1, 'takes exactly one argument')
385 return argvals
[0].join(obj
)
386 elif member
== 'reverse':
387 assertion(not argvals
, 'does not take any arguments')
390 elif member
== 'slice':
391 assertion(isinstance(obj
, list), 'must be applied on a list')
392 assertion(len(argvals
) == 1, 'takes exactly one argument')
393 return obj
[argvals
[0]:]
394 elif member
== 'splice':
395 assertion(isinstance(obj
, list), 'must be applied on a list')
396 assertion(argvals
, 'takes one or more arguments')
397 index
, howMany
= map(int, (argvals
+ [len(obj
)])[:2])
400 add_items
= argvals
[2:]
402 for i
in range(index
, min(index
+ howMany
, len(obj
))):
403 res
.append(obj
.pop(index
))
404 for i
, item
in enumerate(add_items
):
405 obj
.insert(index
+ i
, item
)
407 elif member
== 'unshift':
408 assertion(isinstance(obj
, list), 'must be applied on a list')
409 assertion(argvals
, 'takes one or more arguments')
410 for item
in reversed(argvals
):
413 elif member
== 'pop':
414 assertion(isinstance(obj
, list), 'must be applied on a list')
415 assertion(not argvals
, 'does not take any arguments')
419 elif member
== 'push':
420 assertion(argvals
, 'takes one or more arguments')
423 elif member
== 'forEach':
424 assertion(argvals
, 'takes one or more arguments')
425 assertion(len(argvals
) <= 2, 'takes at-most 2 arguments')
426 f
, this
= (argvals
+ [''])[:2]
427 return [f((item
, idx
, obj
), this
=this
) for idx
, item
in enumerate(obj
)]
428 elif member
== 'indexOf':
429 assertion(argvals
, 'takes one or more arguments')
430 assertion(len(argvals
) <= 2, 'takes at-most 2 arguments')
431 idx
, start
= (argvals
+ [0])[:2]
433 return obj
.index(idx
, start
)
437 if isinstance(obj
, list):
439 return obj
[member
](argvals
)
442 return self
.interpret_expression(
443 self
._named
_object
(local_vars
, eval_method()) + remaining
,
444 local_vars
, allow_recursion
)
448 m
= re
.match(r
'^(?P<func>%s)\((?P<args>[a-zA-Z0-9_$,]*)\)$' % _NAME_RE
, expr
)
450 fname
= m
.group('func')
452 int(v
) if v
.isdigit() else local_vars
[v
]
453 for v
in self
._separate
(m
.group('args')))
454 if fname
in local_vars
:
455 return local_vars
[fname
](argvals
)
456 elif fname
not in self
._functions
:
457 self
._functions
[fname
] = self
.extract_function(fname
)
458 return self
._functions
[fname
](argvals
)
461 raise ExtractorError('Unsupported JS expression %r' % expr
)
463 def extract_object(self
, objname
):
464 _FUNC_NAME_RE
= r
'''(?:[a-zA-Z$0-9]+|"[a-zA-Z$0-9]+"|'[a-zA-Z$0-9]+')'''
468 (?<!this\.)%s\s*=\s*{\s*
469 (?P<fields>(%s\s*:\s*function\s*\(.*?\)\s*{.*?}(?:,\s*)?)*)
471 ''' % (re
.escape(objname
), _FUNC_NAME_RE
),
473 fields
= obj_m
.group('fields')
474 # Currently, it only supports function definitions
475 fields_m
= re
.finditer(
477 (?P<key>%s)\s*:\s*function\s*\((?P<args>[a-z,]+)\){(?P<code>[^}]+)}
481 argnames
= f
.group('args').split(',')
482 obj
[remove_quotes(f
.group('key'))] = self
.build_function(argnames
, f
.group('code'))
486 def extract_function_code(self
, funcname
):
487 """ @returns argnames, code """
490 (?:function\s+%s|[{;,]\s*%s\s*=\s*function|var\s+%s\s*=\s*function)\s*
491 \((?P<args>[^)]*)\)\s*
492 (?P<code>\{(?:(?!};)[^"]|"([^"]|\\")*")+\})''' % (
493 re
.escape(funcname
), re
.escape(funcname
), re
.escape(funcname
)),
495 code
, _
= self
._separate
_at
_paren
(func_m
.group('code'), '}') # refine the match
497 raise ExtractorError('Could not find JS function %r' % funcname
)
498 return func_m
.group('args').split(','), code
500 def extract_function(self
, funcname
):
501 return self
.extract_function_from_code(*self
.extract_function_code(funcname
))
503 def extract_function_from_code(self
, argnames
, code
, *global_stack
):
506 mobj
= re
.search(r
'function\((?P<args>[^)]*)\)\s*{', code
)
509 start
, body_start
= mobj
.span()
510 body
, remaining
= self
._separate
_at
_paren
(code
[body_start
- 1:], '}')
511 name
= self
._named
_object
(
513 self
.extract_function_from_code(
514 [str.strip(x
) for x
in mobj
.group('args').split(',')],
515 body
, local_vars
, *global_stack
))
516 code
= code
[:start
] + name
+ remaining
517 return self
.build_function(argnames
, code
, local_vars
, *global_stack
)
519 def call_function(self
, funcname
, *args
):
520 return self
.extract_function(funcname
)(args
)
522 def build_function(self
, argnames
, code
, *global_stack
):
523 global_stack
= list(global_stack
) or [{}]
524 local_vars
= global_stack
.pop(0)
526 def resf(args
, **kwargs
):
528 **dict(zip(argnames
, args
)),
531 var_stack
= LocalNameSpace(local_vars
, *global_stack
)
532 for stmt
in self
._separate
(code
.replace('\n', ''), ';'):
533 ret
, should_abort
= self
.interpret_statement(stmt
, var_stack
)