7 from .utils
import ExtractorError
, remove_quotes
9 _NAME_RE
= r
'[a-zA-Z_$][\w$]*'
14 '>>': operator
.rshift
,
15 '<<': operator
.lshift
,
19 '/': operator
.truediv
,
23 _MATCHING_PARENS
= dict(zip('({[', ')}]'))
27 class JS_Break(ExtractorError
):
29 ExtractorError
.__init
__(self
, 'Invalid break')
32 class JS_Continue(ExtractorError
):
34 ExtractorError
.__init
__(self
, 'Invalid continue')
37 class LocalNameSpace(collections
.ChainMap
):
38 def __setitem__(self
, key
, value
):
39 for scope
in self
.maps
:
43 self
.maps
[0][key
] = value
45 def __delitem__(self
, key
):
46 raise NotImplementedError('Deleting is not supported')
50 __named_object_counter
= 0
52 def __init__(self
, code
, objects
=None):
53 self
.code
, self
._functions
= code
, {}
54 self
._objects
= {} if objects
is None else objects
56 def _named_object(self
, namespace
, obj
):
57 self
.__named
_object
_counter
+= 1
58 name
= f
'__yt_dlp_jsinterp_obj{self.__named_object_counter}'
63 def _separate(expr
, delim
=',', max_split
=None):
66 counters
= {k: 0 for k in _MATCHING_PARENS.values()}
67 start
, splits
, pos
, delim_len
= 0, 0, 0, len(delim
) - 1
68 in_quote
, escaping
= None, False
69 for idx
, char
in enumerate(expr
):
70 if char
in _MATCHING_PARENS
:
71 counters
[_MATCHING_PARENS
[char
]] += 1
72 elif char
in counters
:
74 elif not escaping
and char
in _QUOTES
and in_quote
in (char
, None):
75 in_quote
= None if in_quote
else char
76 escaping
= not escaping
and in_quote
and char
== '\\'
78 if char
!= delim
[pos
] or any(counters
.values()) or in_quote
:
81 elif pos
!= delim_len
:
84 yield expr
[start
: idx
- delim_len
]
85 start
, pos
= idx
+ 1, 0
87 if max_split
and splits
>= max_split
:
92 def _separate_at_paren(cls
, expr
, delim
):
93 separated
= list(cls
._separate
(expr
, delim
, 1))
94 if len(separated
) < 2:
95 raise ExtractorError(f
'No terminating paren {delim} in {expr}')
96 return separated
[0][1:].strip(), separated
[1].strip()
98 def interpret_statement(self
, stmt
, local_vars
, allow_recursion
=100):
99 if allow_recursion
< 0:
100 raise ExtractorError('Recursion limit reached')
103 sub_statements
= list(self
._separate
(stmt
, ';')) or ['']
104 stmt
= sub_statements
.pop().lstrip()
106 for sub_stmt
in sub_statements
:
107 ret
, should_abort
= self
.interpret_statement(sub_stmt
, local_vars
, allow_recursion
- 1)
109 return ret
, should_abort
111 m
= re
.match(r
'(?P<var>var\s)|return(?:\s+|$)', stmt
)
112 if not m
: # Try interpreting it as an expression
115 expr
= stmt
[len(m
.group(0)):]
117 expr
= stmt
[len(m
.group(0)):]
120 return self
.interpret_expression(expr
, local_vars
, allow_recursion
), should_abort
122 def interpret_expression(self
, expr
, local_vars
, allow_recursion
):
127 if expr
.startswith('{'):
128 inner
, outer
= self
._separate
_at
_paren
(expr
, '}')
129 inner
, should_abort
= self
.interpret_statement(inner
, local_vars
, allow_recursion
- 1)
130 if not outer
or should_abort
:
133 expr
= json
.dumps(inner
) + outer
135 if expr
.startswith('('):
136 inner
, outer
= self
._separate
_at
_paren
(expr
, ')')
137 inner
= self
.interpret_expression(inner
, local_vars
, allow_recursion
)
141 expr
= json
.dumps(inner
) + outer
143 if expr
.startswith('['):
144 inner
, outer
= self
._separate
_at
_paren
(expr
, ']')
145 name
= self
._named
_object
(local_vars
, [
146 self
.interpret_expression(item
, local_vars
, allow_recursion
)
147 for item
in self
._separate
(inner
)])
150 m
= re
.match(r
'(?P<try>try)\s*|(?:(?P<catch>catch)|(?P<for>for)|(?P<switch>switch))\s*\(', expr
)
151 if m
and m
.group('try'):
152 if expr
[m
.end()] == '{':
153 try_expr
, expr
= self
._separate
_at
_paren
(expr
[m
.end():], '}')
155 try_expr
, expr
= expr
[m
.end() - 1:], ''
156 ret
, should_abort
= self
.interpret_statement(try_expr
, local_vars
, allow_recursion
- 1)
159 return self
.interpret_statement(expr
, local_vars
, allow_recursion
- 1)[0]
161 elif m
and m
.group('catch'):
162 # We ignore the catch block
163 _
, expr
= self
._separate
_at
_paren
(expr
, '}')
164 return self
.interpret_statement(expr
, local_vars
, allow_recursion
- 1)[0]
166 elif m
and m
.group('for'):
167 constructor
, remaining
= self
._separate
_at
_paren
(expr
[m
.end() - 1:], ')')
168 if remaining
.startswith('{'):
169 body
, expr
= self
._separate
_at
_paren
(remaining
, '}')
171 switch_m
= re
.match(r
'switch\s*\(', remaining
) # FIXME
173 switch_val
, remaining
= self
._separate
_at
_paren
(remaining
[switch_m
.end() - 1:], ')')
174 body
, expr
= self
._separate
_at
_paren
(remaining
, '}')
175 body
= 'switch(%s){%s}' % (switch_val
, body
)
177 body
, expr
= remaining
, ''
178 start
, cndn
, increment
= self
._separate
(constructor
, ';')
179 if self
.interpret_statement(start
, local_vars
, allow_recursion
- 1)[1]:
180 raise ExtractorError(
181 f
'Premature return in the initialization of a for loop in {constructor!r}')
183 if not self
.interpret_expression(cndn
, local_vars
, allow_recursion
):
186 ret
, should_abort
= self
.interpret_statement(body
, local_vars
, allow_recursion
- 1)
193 if self
.interpret_statement(increment
, local_vars
, allow_recursion
- 1)[1]:
194 raise ExtractorError(
195 f
'Premature return in the initialization of a for loop in {constructor!r}')
196 return self
.interpret_statement(expr
, local_vars
, allow_recursion
- 1)[0]
198 elif m
and m
.group('switch'):
199 switch_val
, remaining
= self
._separate
_at
_paren
(expr
[m
.end() - 1:], ')')
200 switch_val
= self
.interpret_expression(switch_val
, local_vars
, allow_recursion
)
201 body
, expr
= self
._separate
_at
_paren
(remaining
, '}')
202 items
= body
.replace('default:', 'case default:').split('case ')[1:]
203 for default
in (False, True):
206 case
, stmt
= (i
.strip() for i
in self
._separate
(item
, ':', 1))
208 matched
= matched
or case
== 'default'
210 matched
= case
!= 'default' and switch_val
== self
.interpret_expression(case
, local_vars
, allow_recursion
)
214 ret
, should_abort
= self
.interpret_statement(stmt
, local_vars
, allow_recursion
- 1)
221 return self
.interpret_statement(expr
, local_vars
, allow_recursion
- 1)[0]
223 # Comma separated statements
224 sub_expressions
= list(self
._separate
(expr
))
225 expr
= sub_expressions
.pop().strip() if sub_expressions
else ''
226 for sub_expr
in sub_expressions
:
227 self
.interpret_expression(sub_expr
, local_vars
, allow_recursion
)
229 for m
in re
.finditer(rf
'''(?x)
230 (?P<pre_sign>\+\+|--)(?P<var1>{_NAME_RE})|
231 (?P<var2>{_NAME_RE})(?P<post_sign>\+\+|--)''', expr
):
232 var
= m
.group('var1') or m
.group('var2')
233 start
, end
= m
.span()
234 sign
= m
.group('pre_sign') or m
.group('post_sign')
235 ret
= local_vars
[var
]
236 local_vars
[var
] += 1 if sign
[0] == '+' else -1
237 if m
.group('pre_sign'):
238 ret
= local_vars
[var
]
239 expr
= expr
[:start
] + json
.dumps(ret
) + expr
[end
:]
244 m
= re
.match(fr
'''(?x)
246 (?P<out>{_NAME_RE})(?:\[(?P<index>[^\]]+?)\])?\s*
247 (?P<op>{"|".join(map(re.escape, _OPERATORS))})?
250 (?!if|return|true|false|null)(?P<name>{_NAME_RE})$
252 (?P<in>{_NAME_RE})\[(?P<idx>.+)\]$
254 (?P<var>{_NAME_RE})(?:\.(?P<member>[^(]+)|\[(?P<member2>[^\]]+)\])\s*
256 (?P<fname>{_NAME_RE})\((?P<args>[\w$,]*)\)$
258 if m
and m
.group('assign'):
259 if not m
.group('op'):
260 opfunc
= lambda curr
, right
: right
262 opfunc
= _OPERATORS
[m
.group('op')]
263 right_val
= self
.interpret_expression(m
.group('expr'), local_vars
, allow_recursion
)
264 left_val
= local_vars
.get(m
.group('out'))
266 if not m
.group('index'):
267 local_vars
[m
.group('out')] = opfunc(left_val
, right_val
)
268 return local_vars
[m
.group('out')]
269 elif left_val
is None:
270 raise ExtractorError(f
'Cannot index undefined variable: {m.group("out")}')
272 idx
= self
.interpret_expression(m
.group('index'), local_vars
, allow_recursion
)
273 if not isinstance(idx
, int):
274 raise ExtractorError(f
'List indices must be integers: {idx}')
275 left_val
[idx
] = opfunc(left_val
[idx
], right_val
)
281 elif expr
== 'break':
283 elif expr
== 'continue':
286 elif m
and m
.group('return'):
287 return local_vars
[m
.group('name')]
289 with contextlib
.suppress(ValueError):
290 return json
.loads(expr
)
292 if m
and m
.group('indexing'):
293 val
= local_vars
[m
.group('in')]
294 idx
= self
.interpret_expression(m
.group('idx'), local_vars
, allow_recursion
)
297 for op
, opfunc
in _OPERATORS
.items():
298 separated
= list(self
._separate
(expr
, op
))
299 if len(separated
) < 2:
301 right_val
= separated
.pop()
302 left_val
= op
.join(separated
)
303 left_val
, should_abort
= self
.interpret_statement(
304 left_val
, local_vars
, allow_recursion
- 1)
306 raise ExtractorError(f
'Premature left-side return of {op} in {expr!r}')
307 right_val
, should_abort
= self
.interpret_statement(
308 right_val
, local_vars
, allow_recursion
- 1)
310 raise ExtractorError(f
'Premature right-side return of {op} in {expr!r}')
311 return opfunc(left_val
or 0, right_val
)
313 if m
and m
.group('attribute'):
314 variable
= m
.group('var')
315 member
= remove_quotes(m
.group('member') or m
.group('member2'))
316 arg_str
= expr
[m
.end():]
317 if arg_str
.startswith('('):
318 arg_str
, remaining
= self
._separate
_at
_paren
(arg_str
, ')')
320 arg_str
, remaining
= None, arg_str
322 def assertion(cndn
, msg
):
323 """ assert, but without risk of getting optimized out """
325 raise ExtractorError(f
'{member} {msg}: {expr}')
328 if variable
== 'String':
330 elif variable
in local_vars
:
331 obj
= local_vars
[variable
]
333 if variable
not in self
._objects
:
334 self
._objects
[variable
] = self
.extract_object(variable
)
335 obj
= self
._objects
[variable
]
339 if member
== 'length':
345 self
.interpret_expression(v
, local_vars
, allow_recursion
)
346 for v
in self
._separate
(arg_str
)]
349 if member
== 'fromCharCode':
350 assertion(argvals
, 'takes one or more arguments')
351 return ''.join(map(chr, argvals
))
352 raise ExtractorError(f
'Unsupported string method {member}')
354 if member
== 'split':
355 assertion(argvals
, 'takes one or more arguments')
356 assertion(argvals
== [''], 'with arguments is not implemented')
358 elif member
== 'join':
359 assertion(isinstance(obj
, list), 'must be applied on a list')
360 assertion(len(argvals
) == 1, 'takes exactly one argument')
361 return argvals
[0].join(obj
)
362 elif member
== 'reverse':
363 assertion(not argvals
, 'does not take any arguments')
366 elif member
== 'slice':
367 assertion(isinstance(obj
, list), 'must be applied on a list')
368 assertion(len(argvals
) == 1, 'takes exactly one argument')
369 return obj
[argvals
[0]:]
370 elif member
== 'splice':
371 assertion(isinstance(obj
, list), 'must be applied on a list')
372 assertion(argvals
, 'takes one or more arguments')
373 index
, howMany
= map(int, (argvals
+ [len(obj
)])[:2])
376 add_items
= argvals
[2:]
378 for i
in range(index
, min(index
+ howMany
, len(obj
))):
379 res
.append(obj
.pop(index
))
380 for i
, item
in enumerate(add_items
):
381 obj
.insert(index
+ i
, item
)
383 elif member
== 'unshift':
384 assertion(isinstance(obj
, list), 'must be applied on a list')
385 assertion(argvals
, 'takes one or more arguments')
386 for item
in reversed(argvals
):
389 elif member
== 'pop':
390 assertion(isinstance(obj
, list), 'must be applied on a list')
391 assertion(not argvals
, 'does not take any arguments')
395 elif member
== 'push':
396 assertion(argvals
, 'takes one or more arguments')
399 elif member
== 'forEach':
400 assertion(argvals
, 'takes one or more arguments')
401 assertion(len(argvals
) <= 2, 'takes at-most 2 arguments')
402 f
, this
= (argvals
+ [''])[:2]
403 return [f((item
, idx
, obj
), this
=this
) for idx
, item
in enumerate(obj
)]
404 elif member
== 'indexOf':
405 assertion(argvals
, 'takes one or more arguments')
406 assertion(len(argvals
) <= 2, 'takes at-most 2 arguments')
407 idx
, start
= (argvals
+ [0])[:2]
409 return obj
.index(idx
, start
)
413 return obj
[int(member
) if isinstance(obj
, list) else member
](argvals
)
416 return self
.interpret_expression(
417 self
._named
_object
(local_vars
, eval_method()) + remaining
,
418 local_vars
, allow_recursion
)
422 elif m
and m
.group('function'):
423 fname
= m
.group('fname')
425 int(v
) if v
.isdigit() else local_vars
[v
]
426 for v
in self
._separate
(m
.group('args')))
427 if fname
in local_vars
:
428 return local_vars
[fname
](argvals
)
429 elif fname
not in self
._functions
:
430 self
._functions
[fname
] = self
.extract_function(fname
)
431 return self
._functions
[fname
](argvals
)
433 raise ExtractorError(f
'Unsupported JS expression {expr!r}')
435 def extract_object(self
, objname
):
436 _FUNC_NAME_RE
= r
'''(?:[a-zA-Z$0-9]+|"[a-zA-Z$0-9]+"|'[a-zA-Z$0-9]+')'''
440 (?<!this\.)%s\s*=\s*{\s*
441 (?P<fields>(%s\s*:\s*function\s*\(.*?\)\s*{.*?}(?:,\s*)?)*)
443 ''' % (re
.escape(objname
), _FUNC_NAME_RE
),
445 fields
= obj_m
.group('fields')
446 # Currently, it only supports function definitions
447 fields_m
= re
.finditer(
449 (?P<key>%s)\s*:\s*function\s*\((?P<args>[a-z,]+)\){(?P<code>[^}]+)}
453 argnames
= f
.group('args').split(',')
454 obj
[remove_quotes(f
.group('key'))] = self
.build_function(argnames
, f
.group('code'))
458 def extract_function_code(self
, funcname
):
459 """ @returns argnames, code """
464 [{;,]\s*%(name)s\s*=\s*function|
465 var\s+%(name)s\s*=\s*function
467 \((?P<args>[^)]*)\)\s*
468 (?P<code>{(?:(?!};)[^"]|"([^"]|\\")*")+})''' % {'name': re.escape(funcname)}
,
470 code
, _
= self
._separate
_at
_paren
(func_m
.group('code'), '}') # refine the match
472 raise ExtractorError(f
'Could not find JS function "{funcname}"')
473 return func_m
.group('args').split(','), code
475 def extract_function(self
, funcname
):
476 return self
.extract_function_from_code(*self
.extract_function_code(funcname
))
478 def extract_function_from_code(self
, argnames
, code
, *global_stack
):
481 mobj
= re
.search(r
'function\((?P<args>[^)]*)\)\s*{', code
)
484 start
, body_start
= mobj
.span()
485 body
, remaining
= self
._separate
_at
_paren
(code
[body_start
- 1:], '}')
486 name
= self
._named
_object
(local_vars
, self
.extract_function_from_code(
487 [x
.strip() for x
in mobj
.group('args').split(',')],
488 body
, local_vars
, *global_stack
))
489 code
= code
[:start
] + name
+ remaining
490 return self
.build_function(argnames
, code
, local_vars
, *global_stack
)
492 def call_function(self
, funcname
, *args
):
493 return self
.extract_function(funcname
)(args
)
495 def build_function(self
, argnames
, code
, *global_stack
):
496 global_stack
= list(global_stack
) or [{}]
498 def resf(args
, **kwargs
):
499 global_stack
[0].update({
500 **dict(zip(argnames
, args
)),
503 var_stack
= LocalNameSpace(*global_stack
)
504 for stmt
in self
._separate
(code
.replace('\n', ''), ';'):
505 ret
, should_abort
= self
.interpret_statement(stmt
, var_stack
)