7 from .utils
import ExtractorError
, remove_quotes
, truncate_string
9 _NAME_RE
= r
'[a-zA-Z_$][\w$]*'
14 '>>': operator
.rshift
,
15 '<<': operator
.lshift
,
19 '/': operator
.truediv
,
23 _MATCHING_PARENS
= dict(zip('({[', ')}]'))
27 class JS_Break(ExtractorError
):
29 ExtractorError
.__init
__(self
, 'Invalid break')
32 class JS_Continue(ExtractorError
):
34 ExtractorError
.__init
__(self
, 'Invalid continue')
37 class LocalNameSpace(collections
.ChainMap
):
38 def __setitem__(self
, key
, value
):
39 for scope
in self
.maps
:
43 self
.maps
[0][key
] = value
45 def __delitem__(self
, key
):
46 raise NotImplementedError('Deleting is not supported')
50 __named_object_counter
= 0
52 def __init__(self
, code
, objects
=None):
53 self
.code
, self
._functions
= code
, {}
54 self
._objects
= {} if objects
is None else objects
56 class Exception(ExtractorError
):
57 def __init__(self
, msg
, expr
=None, *args
, **kwargs
):
59 msg
+= f
' in: {truncate_string(expr, 50, 50)}'
60 super().__init
__(msg
, *args
, **kwargs
)
62 def _named_object(self
, namespace
, obj
):
63 self
.__named
_object
_counter
+= 1
64 name
= f
'__yt_dlp_jsinterp_obj{self.__named_object_counter}'
69 def _separate(expr
, delim
=',', max_split
=None):
72 counters
= {k: 0 for k in _MATCHING_PARENS.values()}
73 start
, splits
, pos
, delim_len
= 0, 0, 0, len(delim
) - 1
74 in_quote
, escaping
= None, False
75 for idx
, char
in enumerate(expr
):
76 if char
in _MATCHING_PARENS
:
77 counters
[_MATCHING_PARENS
[char
]] += 1
78 elif char
in counters
:
80 elif not escaping
and char
in _QUOTES
and in_quote
in (char
, None):
81 in_quote
= None if in_quote
else char
82 escaping
= not escaping
and in_quote
and char
== '\\'
84 if char
!= delim
[pos
] or any(counters
.values()) or in_quote
:
87 elif pos
!= delim_len
:
90 yield expr
[start
: idx
- delim_len
]
91 start
, pos
= idx
+ 1, 0
93 if max_split
and splits
>= max_split
:
98 def _separate_at_paren(cls
, expr
, delim
):
99 separated
= list(cls
._separate
(expr
, delim
, 1))
100 if len(separated
) < 2:
101 raise cls
.Exception(f
'No terminating paren {delim}', expr
)
102 return separated
[0][1:].strip(), separated
[1].strip()
104 def interpret_statement(self
, stmt
, local_vars
, allow_recursion
=100):
105 if allow_recursion
< 0:
106 raise self
.Exception('Recursion limit reached')
109 sub_statements
= list(self
._separate
(stmt
, ';')) or ['']
110 stmt
= sub_statements
.pop().lstrip()
112 for sub_stmt
in sub_statements
:
113 ret
, should_abort
= self
.interpret_statement(sub_stmt
, local_vars
, allow_recursion
- 1)
115 return ret
, should_abort
117 m
= re
.match(r
'(?P<var>var\s)|return(?:\s+|$)', stmt
)
118 if not m
: # Try interpreting it as an expression
121 expr
= stmt
[len(m
.group(0)):]
123 expr
= stmt
[len(m
.group(0)):]
126 return self
.interpret_expression(expr
, local_vars
, allow_recursion
), should_abort
128 def interpret_expression(self
, expr
, local_vars
, allow_recursion
):
133 if expr
.startswith('{'):
134 inner
, outer
= self
._separate
_at
_paren
(expr
, '}')
135 inner
, should_abort
= self
.interpret_statement(inner
, local_vars
, allow_recursion
- 1)
136 if not outer
or should_abort
:
139 expr
= json
.dumps(inner
) + outer
141 if expr
.startswith('('):
142 inner
, outer
= self
._separate
_at
_paren
(expr
, ')')
143 inner
= self
.interpret_expression(inner
, local_vars
, allow_recursion
)
147 expr
= json
.dumps(inner
) + outer
149 if expr
.startswith('['):
150 inner
, outer
= self
._separate
_at
_paren
(expr
, ']')
151 name
= self
._named
_object
(local_vars
, [
152 self
.interpret_expression(item
, local_vars
, allow_recursion
)
153 for item
in self
._separate
(inner
)])
156 m
= re
.match(r
'(?P<try>try)\s*|(?:(?P<catch>catch)|(?P<for>for)|(?P<switch>switch))\s*\(', expr
)
157 if m
and m
.group('try'):
158 if expr
[m
.end()] == '{':
159 try_expr
, expr
= self
._separate
_at
_paren
(expr
[m
.end():], '}')
161 try_expr
, expr
= expr
[m
.end() - 1:], ''
162 ret
, should_abort
= self
.interpret_statement(try_expr
, local_vars
, allow_recursion
- 1)
165 return self
.interpret_statement(expr
, local_vars
, allow_recursion
- 1)[0]
167 elif m
and m
.group('catch'):
168 # We ignore the catch block
169 _
, expr
= self
._separate
_at
_paren
(expr
, '}')
170 return self
.interpret_statement(expr
, local_vars
, allow_recursion
- 1)[0]
172 elif m
and m
.group('for'):
173 constructor
, remaining
= self
._separate
_at
_paren
(expr
[m
.end() - 1:], ')')
174 if remaining
.startswith('{'):
175 body
, expr
= self
._separate
_at
_paren
(remaining
, '}')
177 switch_m
= re
.match(r
'switch\s*\(', remaining
) # FIXME
179 switch_val
, remaining
= self
._separate
_at
_paren
(remaining
[switch_m
.end() - 1:], ')')
180 body
, expr
= self
._separate
_at
_paren
(remaining
, '}')
181 body
= 'switch(%s){%s}' % (switch_val
, body
)
183 body
, expr
= remaining
, ''
184 start
, cndn
, increment
= self
._separate
(constructor
, ';')
185 if self
.interpret_statement(start
, local_vars
, allow_recursion
- 1)[1]:
186 raise self
.Exception('Premature return in the initialization of a for loop', constructor
)
188 if not self
.interpret_expression(cndn
, local_vars
, allow_recursion
):
191 ret
, should_abort
= self
.interpret_statement(body
, local_vars
, allow_recursion
- 1)
198 if self
.interpret_statement(increment
, local_vars
, allow_recursion
- 1)[1]:
199 raise self
.Exception('Premature return in the initialization of a for loop', constructor
)
200 return self
.interpret_statement(expr
, local_vars
, allow_recursion
- 1)[0]
202 elif m
and m
.group('switch'):
203 switch_val
, remaining
= self
._separate
_at
_paren
(expr
[m
.end() - 1:], ')')
204 switch_val
= self
.interpret_expression(switch_val
, local_vars
, allow_recursion
)
205 body
, expr
= self
._separate
_at
_paren
(remaining
, '}')
206 items
= body
.replace('default:', 'case default:').split('case ')[1:]
207 for default
in (False, True):
210 case
, stmt
= (i
.strip() for i
in self
._separate
(item
, ':', 1))
212 matched
= matched
or case
== 'default'
214 matched
= case
!= 'default' and switch_val
== self
.interpret_expression(case
, local_vars
, allow_recursion
)
218 ret
, should_abort
= self
.interpret_statement(stmt
, local_vars
, allow_recursion
- 1)
225 return self
.interpret_statement(expr
, local_vars
, allow_recursion
- 1)[0]
227 # Comma separated statements
228 sub_expressions
= list(self
._separate
(expr
))
229 expr
= sub_expressions
.pop().strip() if sub_expressions
else ''
230 for sub_expr
in sub_expressions
:
231 self
.interpret_expression(sub_expr
, local_vars
, allow_recursion
)
233 for m
in re
.finditer(rf
'''(?x)
234 (?P<pre_sign>\+\+|--)(?P<var1>{_NAME_RE})|
235 (?P<var2>{_NAME_RE})(?P<post_sign>\+\+|--)''', expr
):
236 var
= m
.group('var1') or m
.group('var2')
237 start
, end
= m
.span()
238 sign
= m
.group('pre_sign') or m
.group('post_sign')
239 ret
= local_vars
[var
]
240 local_vars
[var
] += 1 if sign
[0] == '+' else -1
241 if m
.group('pre_sign'):
242 ret
= local_vars
[var
]
243 expr
= expr
[:start
] + json
.dumps(ret
) + expr
[end
:]
248 m
= re
.match(fr
'''(?x)
250 (?P<out>{_NAME_RE})(?:\[(?P<index>[^\]]+?)\])?\s*
251 (?P<op>{"|".join(map(re.escape, _OPERATORS))})?
254 (?!if|return|true|false|null)(?P<name>{_NAME_RE})$
256 (?P<in>{_NAME_RE})\[(?P<idx>.+)\]$
258 (?P<var>{_NAME_RE})(?:\.(?P<member>[^(]+)|\[(?P<member2>[^\]]+)\])\s*
260 (?P<fname>{_NAME_RE})\((?P<args>[\w$,]*)\)$
262 if m
and m
.group('assign'):
263 if not m
.group('op'):
264 opfunc
= lambda curr
, right
: right
266 opfunc
= _OPERATORS
[m
.group('op')]
267 right_val
= self
.interpret_expression(m
.group('expr'), local_vars
, allow_recursion
)
268 left_val
= local_vars
.get(m
.group('out'))
270 if not m
.group('index'):
271 local_vars
[m
.group('out')] = opfunc(left_val
, right_val
)
272 return local_vars
[m
.group('out')]
273 elif left_val
is None:
274 raise self
.Exception(f
'Cannot index undefined variable {m.group("out")}', expr
)
276 idx
= self
.interpret_expression(m
.group('index'), local_vars
, allow_recursion
)
277 if not isinstance(idx
, int):
278 raise self
.Exception(f
'List index {idx} must be integer', expr
)
279 left_val
[idx
] = opfunc(left_val
[idx
], right_val
)
285 elif expr
== 'break':
287 elif expr
== 'continue':
290 elif m
and m
.group('return'):
291 return local_vars
[m
.group('name')]
293 with contextlib
.suppress(ValueError):
294 return json
.loads(expr
)
296 if m
and m
.group('indexing'):
297 val
= local_vars
[m
.group('in')]
298 idx
= self
.interpret_expression(m
.group('idx'), local_vars
, allow_recursion
)
301 for op
, opfunc
in _OPERATORS
.items():
302 separated
= list(self
._separate
(expr
, op
))
303 if len(separated
) < 2:
305 right_val
= separated
.pop()
306 left_val
= op
.join(separated
)
307 left_val
, should_abort
= self
.interpret_statement(
308 left_val
, local_vars
, allow_recursion
- 1)
310 raise self
.Exception(f
'Premature left-side return of {op}', expr
)
311 right_val
, should_abort
= self
.interpret_statement(
312 right_val
, local_vars
, allow_recursion
- 1)
314 raise self
.Exception(f
'Premature right-side return of {op}', expr
)
315 return opfunc(left_val
or 0, right_val
)
317 if m
and m
.group('attribute'):
318 variable
= m
.group('var')
319 member
= remove_quotes(m
.group('member') or m
.group('member2'))
320 arg_str
= expr
[m
.end():]
321 if arg_str
.startswith('('):
322 arg_str
, remaining
= self
._separate
_at
_paren
(arg_str
, ')')
324 arg_str
, remaining
= None, arg_str
326 def assertion(cndn
, msg
):
327 """ assert, but without risk of getting optimized out """
329 raise self
.Exception(f
'{member} {msg}', expr
)
332 if variable
== 'String':
334 elif variable
in local_vars
:
335 obj
= local_vars
[variable
]
337 if variable
not in self
._objects
:
338 self
._objects
[variable
] = self
.extract_object(variable
)
339 obj
= self
._objects
[variable
]
343 if member
== 'length':
349 self
.interpret_expression(v
, local_vars
, allow_recursion
)
350 for v
in self
._separate
(arg_str
)]
353 if member
== 'fromCharCode':
354 assertion(argvals
, 'takes one or more arguments')
355 return ''.join(map(chr, argvals
))
356 raise self
.Exception(f
'Unsupported string method {member}', expr
)
358 if member
== 'split':
359 assertion(argvals
, 'takes one or more arguments')
360 assertion(argvals
== [''], 'with arguments is not implemented')
362 elif member
== 'join':
363 assertion(isinstance(obj
, list), 'must be applied on a list')
364 assertion(len(argvals
) == 1, 'takes exactly one argument')
365 return argvals
[0].join(obj
)
366 elif member
== 'reverse':
367 assertion(not argvals
, 'does not take any arguments')
370 elif member
== 'slice':
371 assertion(isinstance(obj
, list), 'must be applied on a list')
372 assertion(len(argvals
) == 1, 'takes exactly one argument')
373 return obj
[argvals
[0]:]
374 elif member
== 'splice':
375 assertion(isinstance(obj
, list), 'must be applied on a list')
376 assertion(argvals
, 'takes one or more arguments')
377 index
, howMany
= map(int, (argvals
+ [len(obj
)])[:2])
380 add_items
= argvals
[2:]
382 for i
in range(index
, min(index
+ howMany
, len(obj
))):
383 res
.append(obj
.pop(index
))
384 for i
, item
in enumerate(add_items
):
385 obj
.insert(index
+ i
, item
)
387 elif member
== 'unshift':
388 assertion(isinstance(obj
, list), 'must be applied on a list')
389 assertion(argvals
, 'takes one or more arguments')
390 for item
in reversed(argvals
):
393 elif member
== 'pop':
394 assertion(isinstance(obj
, list), 'must be applied on a list')
395 assertion(not argvals
, 'does not take any arguments')
399 elif member
== 'push':
400 assertion(argvals
, 'takes one or more arguments')
403 elif member
== 'forEach':
404 assertion(argvals
, 'takes one or more arguments')
405 assertion(len(argvals
) <= 2, 'takes at-most 2 arguments')
406 f
, this
= (argvals
+ [''])[:2]
407 return [f((item
, idx
, obj
), this
=this
) for idx
, item
in enumerate(obj
)]
408 elif member
== 'indexOf':
409 assertion(argvals
, 'takes one or more arguments')
410 assertion(len(argvals
) <= 2, 'takes at-most 2 arguments')
411 idx
, start
= (argvals
+ [0])[:2]
413 return obj
.index(idx
, start
)
417 return obj
[int(member
) if isinstance(obj
, list) else member
](argvals
)
420 return self
.interpret_expression(
421 self
._named
_object
(local_vars
, eval_method()) + remaining
,
422 local_vars
, allow_recursion
)
426 elif m
and m
.group('function'):
427 fname
= m
.group('fname')
429 int(v
) if v
.isdigit() else local_vars
[v
]
430 for v
in self
._separate
(m
.group('args')))
431 if fname
in local_vars
:
432 return local_vars
[fname
](argvals
)
433 elif fname
not in self
._functions
:
434 self
._functions
[fname
] = self
.extract_function(fname
)
435 return self
._functions
[fname
](argvals
)
437 raise self
.Exception('Unsupported JS expression', expr
)
439 def extract_object(self
, objname
):
440 _FUNC_NAME_RE
= r
'''(?:[a-zA-Z$0-9]+|"[a-zA-Z$0-9]+"|'[a-zA-Z$0-9]+')'''
444 (?<!this\.)%s\s*=\s*{\s*
445 (?P<fields>(%s\s*:\s*function\s*\(.*?\)\s*{.*?}(?:,\s*)?)*)
447 ''' % (re
.escape(objname
), _FUNC_NAME_RE
),
449 fields
= obj_m
.group('fields')
450 # Currently, it only supports function definitions
451 fields_m
= re
.finditer(
453 (?P<key>%s)\s*:\s*function\s*\((?P<args>[a-z,]+)\){(?P<code>[^}]+)}
457 argnames
= f
.group('args').split(',')
458 obj
[remove_quotes(f
.group('key'))] = self
.build_function(argnames
, f
.group('code'))
462 def extract_function_code(self
, funcname
):
463 """ @returns argnames, code """
468 [{;,]\s*%(name)s\s*=\s*function|
469 var\s+%(name)s\s*=\s*function
471 \((?P<args>[^)]*)\)\s*
472 (?P<code>{(?:(?!};)[^"]|"([^"]|\\")*")+})''' % {'name': re.escape(funcname)}
,
474 code
, _
= self
._separate
_at
_paren
(func_m
.group('code'), '}') # refine the match
476 raise self
.Exception(f
'Could not find JS function "{funcname}"')
477 return func_m
.group('args').split(','), code
479 def extract_function(self
, funcname
):
480 return self
.extract_function_from_code(*self
.extract_function_code(funcname
))
482 def extract_function_from_code(self
, argnames
, code
, *global_stack
):
485 mobj
= re
.search(r
'function\((?P<args>[^)]*)\)\s*{', code
)
488 start
, body_start
= mobj
.span()
489 body
, remaining
= self
._separate
_at
_paren
(code
[body_start
- 1:], '}')
490 name
= self
._named
_object
(local_vars
, self
.extract_function_from_code(
491 [x
.strip() for x
in mobj
.group('args').split(',')],
492 body
, local_vars
, *global_stack
))
493 code
= code
[:start
] + name
+ remaining
494 return self
.build_function(argnames
, code
, local_vars
, *global_stack
)
496 def call_function(self
, funcname
, *args
):
497 return self
.extract_function(funcname
)(args
)
499 def build_function(self
, argnames
, code
, *global_stack
):
500 global_stack
= list(global_stack
) or [{}]
502 def resf(args
, **kwargs
):
503 global_stack
[0].update({
504 **dict(zip(argnames
, args
)),
507 var_stack
= LocalNameSpace(*global_stack
)
508 for stmt
in self
._separate
(code
.replace('\n', ''), ';'):
509 ret
, should_abort
= self
.interpret_statement(stmt
, var_stack
)