7 from .utils
import ExtractorError
, remove_quotes
13 ('>>', operator
.rshift
),
14 ('<<', operator
.lshift
),
18 ('/', operator
.truediv
),
21 _ASSIGN_OPERATORS
= [(op
+ '=', opfunc
) for op
, opfunc
in _OPERATORS
]
22 _ASSIGN_OPERATORS
.append(('=', (lambda cur
, right
: right
)))
24 _NAME_RE
= r
'[a-zA-Z_$][a-zA-Z_$0-9]*'
26 _MATCHING_PARENS
= dict(zip('({[', ')}]'))
30 class JS_Break(ExtractorError
):
32 ExtractorError
.__init
__(self
, 'Invalid break')
35 class JS_Continue(ExtractorError
):
37 ExtractorError
.__init
__(self
, 'Invalid continue')
40 class LocalNameSpace(collections
.ChainMap
):
41 def __setitem__(self
, key
, value
):
42 for scope
in self
.maps
:
46 self
.maps
[0][key
] = value
48 def __delitem__(self
, key
):
49 raise NotImplementedError('Deleting is not supported')
53 def __init__(self
, code
, objects
=None):
58 self
._objects
= objects
59 self
.__named
_object
_counter
= 0
61 def _named_object(self
, namespace
, obj
):
62 self
.__named
_object
_counter
+= 1
63 name
= f
'__yt_dlp_jsinterp_obj{self.__named_object_counter}'
68 def _separate(expr
, delim
=',', max_split
=None):
71 counters
= {k: 0 for k in _MATCHING_PARENS.values()}
72 start
, splits
, pos
, delim_len
= 0, 0, 0, len(delim
) - 1
73 in_quote
, escaping
= None, False
74 for idx
, char
in enumerate(expr
):
75 if char
in _MATCHING_PARENS
:
76 counters
[_MATCHING_PARENS
[char
]] += 1
77 elif char
in counters
:
79 elif not escaping
and char
in _QUOTES
and in_quote
in (char
, None):
80 in_quote
= None if in_quote
else char
81 escaping
= not escaping
and in_quote
and char
== '\\'
83 if char
!= delim
[pos
] or any(counters
.values()) or in_quote
:
86 elif pos
!= delim_len
:
89 yield expr
[start
: idx
- delim_len
]
90 start
, pos
= idx
+ 1, 0
92 if max_split
and splits
>= max_split
:
97 def _separate_at_paren(expr
, delim
):
98 separated
= list(JSInterpreter
._separate
(expr
, delim
, 1))
99 if len(separated
) < 2:
100 raise ExtractorError(f
'No terminating paren {delim} in {expr}')
101 return separated
[0][1:].strip(), separated
[1].strip()
103 def interpret_statement(self
, stmt
, local_vars
, allow_recursion
=100):
104 if allow_recursion
< 0:
105 raise ExtractorError('Recursion limit reached')
107 sub_statements
= list(self
._separate
(stmt
, ';'))
108 stmt
= (sub_statements
or ['']).pop()
109 for sub_stmt
in sub_statements
:
110 ret
, should_abort
= self
.interpret_statement(sub_stmt
, local_vars
, allow_recursion
- 1)
116 stmt_m
= re
.match(r
'var\s', stmt
)
118 expr
= stmt
[len(stmt_m
.group(0)):]
120 return_m
= re
.match(r
'return(?:\s+|$)', stmt
)
122 expr
= stmt
[len(return_m
.group(0)):]
125 # Try interpreting it as an expression
128 v
= self
.interpret_expression(expr
, local_vars
, allow_recursion
)
129 return v
, should_abort
131 def interpret_expression(self
, expr
, local_vars
, allow_recursion
):
133 if expr
== '': # Empty expression
136 if expr
.startswith('{'):
137 inner
, outer
= self
._separate
_at
_paren
(expr
, '}')
138 inner
, should_abort
= self
.interpret_statement(inner
, local_vars
, allow_recursion
- 1)
139 if not outer
or should_abort
:
142 expr
= json
.dumps(inner
) + outer
144 if expr
.startswith('('):
145 inner
, outer
= self
._separate
_at
_paren
(expr
, ')')
146 inner
= self
.interpret_expression(inner
, local_vars
, allow_recursion
)
150 expr
= json
.dumps(inner
) + outer
152 if expr
.startswith('['):
153 inner
, outer
= self
._separate
_at
_paren
(expr
, ']')
154 name
= self
._named
_object
(local_vars
, [
155 self
.interpret_expression(item
, local_vars
, allow_recursion
)
156 for item
in self
._separate
(inner
)])
159 m
= re
.match(r
'try\s*', expr
)
161 if expr
[m
.end()] == '{':
162 try_expr
, expr
= self
._separate
_at
_paren
(expr
[m
.end():], '}')
164 try_expr
, expr
= expr
[m
.end() - 1:], ''
165 ret
, should_abort
= self
.interpret_statement(try_expr
, local_vars
, allow_recursion
- 1)
168 return self
.interpret_statement(expr
, local_vars
, allow_recursion
- 1)[0]
170 m
= re
.match(r
'catch\s*\(', expr
)
172 # We ignore the catch block
173 _
, expr
= self
._separate
_at
_paren
(expr
, '}')
174 return self
.interpret_statement(expr
, local_vars
, allow_recursion
- 1)[0]
176 m
= re
.match(r
'for\s*\(', expr
)
178 constructor
, remaining
= self
._separate
_at
_paren
(expr
[m
.end() - 1:], ')')
179 if remaining
.startswith('{'):
180 body
, expr
= self
._separate
_at
_paren
(remaining
, '}')
182 m
= re
.match(r
'switch\s*\(', remaining
) # FIXME
184 switch_val
, remaining
= self
._separate
_at
_paren
(remaining
[m
.end() - 1:], ')')
185 body
, expr
= self
._separate
_at
_paren
(remaining
, '}')
186 body
= 'switch(%s){%s}' % (switch_val
, body
)
188 body
, expr
= remaining
, ''
189 start
, cndn
, increment
= self
._separate
(constructor
, ';')
190 if self
.interpret_statement(start
, local_vars
, allow_recursion
- 1)[1]:
191 raise ExtractorError(
192 f
'Premature return in the initialization of a for loop in {constructor!r}')
194 if not self
.interpret_expression(cndn
, local_vars
, allow_recursion
):
197 ret
, should_abort
= self
.interpret_statement(body
, local_vars
, allow_recursion
- 1)
204 if self
.interpret_statement(increment
, local_vars
, allow_recursion
- 1)[1]:
205 raise ExtractorError(
206 f
'Premature return in the initialization of a for loop in {constructor!r}')
207 return self
.interpret_statement(expr
, local_vars
, allow_recursion
- 1)[0]
209 m
= re
.match(r
'switch\s*\(', expr
)
211 switch_val
, remaining
= self
._separate
_at
_paren
(expr
[m
.end() - 1:], ')')
212 switch_val
= self
.interpret_expression(switch_val
, local_vars
, allow_recursion
)
213 body
, expr
= self
._separate
_at
_paren
(remaining
, '}')
214 items
= body
.replace('default:', 'case default:').split('case ')[1:]
215 for default
in (False, True):
218 case
, stmt
= (i
.strip() for i
in self
._separate
(item
, ':', 1))
220 matched
= matched
or case
== 'default'
222 matched
= case
!= 'default' and switch_val
== self
.interpret_expression(case
, local_vars
, allow_recursion
)
226 ret
, should_abort
= self
.interpret_statement(stmt
, local_vars
, allow_recursion
- 1)
233 return self
.interpret_statement(expr
, local_vars
, allow_recursion
- 1)[0]
235 # Comma separated statements
236 sub_expressions
= list(self
._separate
(expr
))
237 expr
= sub_expressions
.pop().strip() if sub_expressions
else ''
238 for sub_expr
in sub_expressions
:
239 self
.interpret_expression(sub_expr
, local_vars
, allow_recursion
)
241 for m
in re
.finditer(rf
'''(?x)
242 (?P<pre_sign>\+\+|--)(?P<var1>{_NAME_RE})|
243 (?P<var2>{_NAME_RE})(?P<post_sign>\+\+|--)''', expr
):
244 var
= m
.group('var1') or m
.group('var2')
245 start
, end
= m
.span()
246 sign
= m
.group('pre_sign') or m
.group('post_sign')
247 ret
= local_vars
[var
]
248 local_vars
[var
] += 1 if sign
[0] == '+' else -1
249 if m
.group('pre_sign'):
250 ret
= local_vars
[var
]
251 expr
= expr
[:start
] + json
.dumps(ret
) + expr
[end
:]
253 for op
, opfunc
in _ASSIGN_OPERATORS
:
254 m
= re
.match(rf
'''(?x)
255 (?P<out>{_NAME_RE})(?:\[(?P<index>[^\]]+?)\])?
257 (?P<expr>.*)$''', expr
)
260 right_val
= self
.interpret_expression(m
.group('expr'), local_vars
, allow_recursion
)
262 if m
.groupdict().get('index'):
263 lvar
= local_vars
[m
.group('out')]
264 idx
= self
.interpret_expression(m
.group('index'), local_vars
, allow_recursion
)
265 if not isinstance(idx
, int):
266 raise ExtractorError(f
'List indices must be integers: {idx}')
268 val
= opfunc(cur
, right_val
)
272 cur
= local_vars
.get(m
.group('out'))
273 val
= opfunc(cur
, right_val
)
274 local_vars
[m
.group('out')] = val
282 elif expr
== 'continue':
286 r
'(?!if|return|true|false|null)(?P<name>%s)$' % _NAME_RE
,
289 return local_vars
[var_m
.group('name')]
291 with contextlib
.suppress(ValueError):
292 return json
.loads(expr
)
295 r
'(?P<in>%s)\[(?P<idx>.+)\]$' % _NAME_RE
, expr
)
297 val
= local_vars
[m
.group('in')]
298 idx
= self
.interpret_expression(m
.group('idx'), local_vars
, allow_recursion
)
301 for op
, opfunc
in _OPERATORS
:
302 separated
= list(self
._separate
(expr
, op
))
303 if len(separated
) < 2:
305 right_val
= separated
.pop()
306 left_val
= op
.join(separated
)
307 left_val
, should_abort
= self
.interpret_statement(
308 left_val
, local_vars
, allow_recursion
- 1)
310 raise ExtractorError(f
'Premature left-side return of {op} in {expr!r}')
311 right_val
, should_abort
= self
.interpret_statement(
312 right_val
, local_vars
, allow_recursion
- 1)
314 raise ExtractorError(f
'Premature right-side return of {op} in {expr!r}')
315 return opfunc(left_val
or 0, right_val
)
318 r
'(?P<var>%s)(?:\.(?P<member>[^(]+)|\[(?P<member2>[^]]+)\])\s*' % _NAME_RE
,
321 variable
= m
.group('var')
322 member
= remove_quotes(m
.group('member') or m
.group('member2'))
323 arg_str
= expr
[m
.end():]
324 if arg_str
.startswith('('):
325 arg_str
, remaining
= self
._separate
_at
_paren
(arg_str
, ')')
327 arg_str
, remaining
= None, arg_str
329 def assertion(cndn
, msg
):
330 """ assert, but without risk of getting optimized out """
332 raise ExtractorError(f
'{member} {msg}: {expr}')
336 if variable
== 'String':
338 elif variable
in local_vars
:
339 obj
= local_vars
[variable
]
341 if variable
not in self
._objects
:
342 self
._objects
[variable
] = self
.extract_object(variable
)
343 obj
= self
._objects
[variable
]
347 if member
== 'length':
353 self
.interpret_expression(v
, local_vars
, allow_recursion
)
354 for v
in self
._separate
(arg_str
)]
357 if member
== 'fromCharCode':
358 assertion(argvals
, 'takes one or more arguments')
359 return ''.join(map(chr, argvals
))
360 raise ExtractorError(f
'Unsupported string method {member}')
362 if member
== 'split':
363 assertion(argvals
, 'takes one or more arguments')
364 assertion(argvals
== [''], 'with arguments is not implemented')
366 elif member
== 'join':
367 assertion(isinstance(obj
, list), 'must be applied on a list')
368 assertion(len(argvals
) == 1, 'takes exactly one argument')
369 return argvals
[0].join(obj
)
370 elif member
== 'reverse':
371 assertion(not argvals
, 'does not take any arguments')
374 elif member
== 'slice':
375 assertion(isinstance(obj
, list), 'must be applied on a list')
376 assertion(len(argvals
) == 1, 'takes exactly one argument')
377 return obj
[argvals
[0]:]
378 elif member
== 'splice':
379 assertion(isinstance(obj
, list), 'must be applied on a list')
380 assertion(argvals
, 'takes one or more arguments')
381 index
, howMany
= map(int, (argvals
+ [len(obj
)])[:2])
384 add_items
= argvals
[2:]
386 for i
in range(index
, min(index
+ howMany
, len(obj
))):
387 res
.append(obj
.pop(index
))
388 for i
, item
in enumerate(add_items
):
389 obj
.insert(index
+ i
, item
)
391 elif member
== 'unshift':
392 assertion(isinstance(obj
, list), 'must be applied on a list')
393 assertion(argvals
, 'takes one or more arguments')
394 for item
in reversed(argvals
):
397 elif member
== 'pop':
398 assertion(isinstance(obj
, list), 'must be applied on a list')
399 assertion(not argvals
, 'does not take any arguments')
403 elif member
== 'push':
404 assertion(argvals
, 'takes one or more arguments')
407 elif member
== 'forEach':
408 assertion(argvals
, 'takes one or more arguments')
409 assertion(len(argvals
) <= 2, 'takes at-most 2 arguments')
410 f
, this
= (argvals
+ [''])[:2]
411 return [f((item
, idx
, obj
), this
=this
) for idx
, item
in enumerate(obj
)]
412 elif member
== 'indexOf':
413 assertion(argvals
, 'takes one or more arguments')
414 assertion(len(argvals
) <= 2, 'takes at-most 2 arguments')
415 idx
, start
= (argvals
+ [0])[:2]
417 return obj
.index(idx
, start
)
421 if isinstance(obj
, list):
423 return obj
[member
](argvals
)
426 return self
.interpret_expression(
427 self
._named
_object
(local_vars
, eval_method()) + remaining
,
428 local_vars
, allow_recursion
)
432 m
= re
.match(r
'^(?P<func>%s)\((?P<args>[a-zA-Z0-9_$,]*)\)$' % _NAME_RE
, expr
)
434 fname
= m
.group('func')
436 int(v
) if v
.isdigit() else local_vars
[v
]
437 for v
in self
._separate
(m
.group('args')))
438 if fname
in local_vars
:
439 return local_vars
[fname
](argvals
)
440 elif fname
not in self
._functions
:
441 self
._functions
[fname
] = self
.extract_function(fname
)
442 return self
._functions
[fname
](argvals
)
445 raise ExtractorError('Unsupported JS expression %r' % expr
)
447 def extract_object(self
, objname
):
448 _FUNC_NAME_RE
= r
'''(?:[a-zA-Z$0-9]+|"[a-zA-Z$0-9]+"|'[a-zA-Z$0-9]+')'''
452 (?<!this\.)%s\s*=\s*{\s*
453 (?P<fields>(%s\s*:\s*function\s*\(.*?\)\s*{.*?}(?:,\s*)?)*)
455 ''' % (re
.escape(objname
), _FUNC_NAME_RE
),
457 fields
= obj_m
.group('fields')
458 # Currently, it only supports function definitions
459 fields_m
= re
.finditer(
461 (?P<key>%s)\s*:\s*function\s*\((?P<args>[a-z,]+)\){(?P<code>[^}]+)}
465 argnames
= f
.group('args').split(',')
466 obj
[remove_quotes(f
.group('key'))] = self
.build_function(argnames
, f
.group('code'))
470 def extract_function_code(self
, funcname
):
471 """ @returns argnames, code """
474 (?:function\s+%s|[{;,]\s*%s\s*=\s*function|var\s+%s\s*=\s*function)\s*
475 \((?P<args>[^)]*)\)\s*
476 (?P<code>\{(?:(?!};)[^"]|"([^"]|\\")*")+\})''' % (
477 re
.escape(funcname
), re
.escape(funcname
), re
.escape(funcname
)),
479 code
, _
= self
._separate
_at
_paren
(func_m
.group('code'), '}') # refine the match
481 raise ExtractorError('Could not find JS function %r' % funcname
)
482 return func_m
.group('args').split(','), code
484 def extract_function(self
, funcname
):
485 return self
.extract_function_from_code(*self
.extract_function_code(funcname
))
487 def extract_function_from_code(self
, argnames
, code
, *global_stack
):
490 mobj
= re
.search(r
'function\((?P<args>[^)]*)\)\s*{', code
)
493 start
, body_start
= mobj
.span()
494 body
, remaining
= self
._separate
_at
_paren
(code
[body_start
- 1:], '}')
495 name
= self
._named
_object
(
497 self
.extract_function_from_code(
498 [str.strip(x
) for x
in mobj
.group('args').split(',')],
499 body
, local_vars
, *global_stack
))
500 code
= code
[:start
] + name
+ remaining
501 return self
.build_function(argnames
, code
, local_vars
, *global_stack
)
503 def call_function(self
, funcname
, *args
):
504 return self
.extract_function(funcname
)(args
)
506 def build_function(self
, argnames
, code
, *global_stack
):
507 global_stack
= list(global_stack
) or [{}]
509 def resf(args
, **kwargs
):
510 global_stack
[0].update({
511 **dict(zip(argnames
, args
)),
514 var_stack
= LocalNameSpace(*global_stack
)
515 for stmt
in self
._separate
(code
.replace('\n', ''), ';'):
516 ret
, should_abort
= self
.interpret_statement(stmt
, var_stack
)