raise ExtractorError('Cannot identify player %r' % player_url)
return id_m.group('id')
- def _load_player(self, video_id, player_url, fatal=True) -> bool:
+ def _load_player(self, video_id, player_url, fatal=True):
player_id = self._extract_player_info(player_url)
if player_id not in self._code_cache:
code = self._download_webpage(
errnote='Download of %s failed' % player_url)
if code:
self._code_cache[player_id] = code
- return player_id in self._code_cache
+ return self._code_cache.get(player_id)
def _extract_signature_function(self, video_id, player_url, example_sig):
player_id = self._extract_player_info(player_url)
if cache_spec is not None:
return lambda s: ''.join(s[i] for i in cache_spec)
- if self._load_player(video_id, player_url):
- code = self._code_cache[player_id]
+ code = self._load_player(video_id, player_url)
+ if code:
res = self._parse_sig_js(code)
test_string = ''.join(map(compat_chr, range(len(example_sig))))
return res
def _print_sig_code(self, func, example_sig):
+ if not self.get_param('youtube_print_sig_code'):
+ return
+
def gen_sig_code(idxs):
def _genslice(start, end, step):
starts = '' if start == 0 else str(start)
)
self._player_cache[player_id] = func
func = self._player_cache[player_id]
- if self.get_param('youtube_print_sig_code'):
- self._print_sig_code(func, s)
+ self._print_sig_code(func, s)
return func(s)
except Exception as e:
- tb = traceback.format_exc()
- raise ExtractorError(
- 'Signature extraction failed: ' + tb, cause=e)
+ raise ExtractorError('Signature extraction failed: ' + traceback.format_exc(), cause=e)
+
+ def _decrypt_nsig(self, s, video_id, player_url):
+ """Turn the encrypted n field into a working signature"""
+ if player_url is None:
+ raise ExtractorError('Cannot decrypt nsig without player_url')
+ if player_url.startswith('//'):
+ player_url = 'https:' + player_url
+ elif not re.match(r'https?://', player_url):
+ player_url = compat_urlparse.urljoin(
+ 'https://www.youtube.com', player_url)
+
+ sig_id = ('nsig_value', s)
+ if sig_id in self._player_cache:
+ return self._player_cache[sig_id]
+
+ try:
+ player_id = ('nsig', player_url)
+ if player_id not in self._player_cache:
+ self._player_cache[player_id] = self._extract_n_function(video_id, player_url)
+ func = self._player_cache[player_id]
+ self._player_cache[sig_id] = func(s)
+ self.write_debug(f'Decrypted nsig {s} => {self._player_cache[sig_id]}')
+ return self._player_cache[sig_id]
+ except Exception as e:
+ raise ExtractorError(traceback.format_exc(), cause=e)
+
+ def _extract_n_function_name(self, jscode):
+ return self._search_regex(
+ (r'\.get\("n"\)\)&&\(b=(?P<nfunc>[a-zA-Z0-9$]{3})\([a-zA-Z0-9]\)',),
+ jscode, 'Initial JS player n function name', group='nfunc')
+
+ def _extract_n_function(self, video_id, player_url):
+ player_id = self._extract_player_info(player_url)
+ func_code = self._downloader.cache.load('youtube-nsig', player_id)
+
+ if func_code:
+ jsi = JSInterpreter(func_code)
+ else:
+ jscode = self._load_player(video_id, player_url)
+ funcname = self._extract_n_function_name(jscode)
+ jsi = JSInterpreter(jscode)
+ func_code = jsi.extract_function_code(funcname)
+ self._downloader.cache.store('youtube-nsig', player_id, func_code)
+
+ if self.get_param('youtube_print_sig_code'):
+ self.to_screen(f'Extracted nsig function from {player_id}:\n{func_code[1]}\n')
+
+ return lambda s: jsi.extract_function_from_code(*func_code)([s])
def _extract_signature_timestamp(self, video_id, player_url, ytcfg=None, fatal=False):
"""
raise ExtractorError(error_msg)
self.report_warning(error_msg)
return
- if self._load_player(video_id, player_url, fatal=fatal):
- player_id = self._extract_player_info(player_url)
- code = self._code_cache[player_id]
+ code = self._load_player(video_id, player_url, fatal=fatal)
+ if code:
sts = int_or_none(self._search_regex(
r'(?:signatureTimestamp|sts)\s*:\s*(?P<sts>[0-9]{5})', code,
'JS player signature timestamp', group='sts', fatal=fatal))
sp = try_get(sc, lambda x: x['sp'][0]) or 'signature'
fmt_url += '&' + sp + '=' + signature
+ query = parse_qs(fmt_url)
+ throttled = False
+ if query.get('ratebypass') != ['yes'] and query.get('n'):
+ try:
+ fmt_url = update_url_query(fmt_url, {
+ 'n': self._decrypt_nsig(query['n'][0], video_id, player_url)})
+ except ExtractorError as e:
+ self.report_warning(f'nsig extraction failed: You may experience throttling for some formats\n{e}', only_once=True)
+ throttled = True
+
if itag:
itags.append(itag)
stream_ids.append(stream_id)
'format_note': ', '.join(filter(None, (
'%s%s' % (audio_track.get('displayName') or '',
' (default)' if audio_track.get('audioIsDefault') else ''),
- fmt.get('qualityLabel') or quality.replace('audio_quality_', '')))),
+ fmt.get('qualityLabel') or quality.replace('audio_quality_', ''),
+ throttled and 'THROTTLED'))),
+ 'source_preference': -10 if not throttled else -1,
'fps': int_or_none(fmt.get('fps')),
'height': height,
'quality': q(quality),
if reason:
self.raise_no_formats(reason, expected=True)
- for f in formats:
- if '&c=WEB&' in f['url'] and '&ratebypass=yes&' not in f['url']: # throttled
- f['source_preference'] = -10
- # TODO: this method is not reliable
- f['format_note'] = format_field(f, 'format_note', '%s ') + '(maybe throttled)'
-
# Source is given priority since formats that throttle are given lower source_preference
# When throttling issue is fully fixed, remove this
self._sort_formats(formats, ('quality', 'res', 'fps', 'hdr:12', 'source', 'codec:vp9.2', 'lang'))
-from __future__ import unicode_literals
-
+from collections.abc import MutableMapping
import json
import operator
import re
('*', operator.mul),
]
_ASSIGN_OPERATORS = [(op + '=', opfunc) for op, opfunc in _OPERATORS]
-_ASSIGN_OPERATORS.append(('=', lambda cur, right: right))
+_ASSIGN_OPERATORS.append(('=', (lambda cur, right: right)))
_NAME_RE = r'[a-zA-Z_$][a-zA-Z_$0-9]*'
+class JS_Break(ExtractorError):
+ def __init__(self):
+ ExtractorError.__init__(self, 'Invalid break')
+
+
+class JS_Continue(ExtractorError):
+ def __init__(self):
+ ExtractorError.__init__(self, 'Invalid continue')
+
+
+class LocalNameSpace(MutableMapping):
+ def __init__(self, *stack):
+ self.stack = tuple(stack)
+
+ def __getitem__(self, key):
+ for scope in self.stack:
+ if key in scope:
+ return scope[key]
+ raise KeyError(key)
+
+ def __setitem__(self, key, value):
+ for scope in self.stack:
+ if key in scope:
+ scope[key] = value
+ break
+ else:
+ self.stack[0][key] = value
+ return value
+
+ def __delitem__(self, key):
+ raise NotImplementedError('Deleting is not supported')
+
+ def __iter__(self):
+ for scope in self.stack:
+ yield from scope
+
+ def __len__(self, key):
+ return len(iter(self))
+
+ def __repr__(self):
+ return f'LocalNameSpace{self.stack}'
+
+
class JSInterpreter(object):
def __init__(self, code, objects=None):
if objects is None:
self.code = code
self._functions = {}
self._objects = objects
+ self.__named_object_counter = 0
+
+ def _named_object(self, namespace, obj):
+ self.__named_object_counter += 1
+ name = f'__yt_dlp_jsinterp_obj{self.__named_object_counter}'
+ namespace[name] = obj
+ return name
+
+ @staticmethod
+ def _seperate(expr, delim=',', max_split=None):
+ if not expr:
+ return
+ parens = {'(': 0, '{': 0, '[': 0, ']': 0, '}': 0, ')': 0}
+ start, splits, pos, max_pos = 0, 0, 0, len(delim) - 1
+ for idx, char in enumerate(expr):
+ if char in parens:
+ parens[char] += 1
+ is_in_parens = (parens['['] - parens[']']
+ or parens['('] - parens[')']
+ or parens['{'] - parens['}'])
+ if char == delim[pos] and not is_in_parens:
+ if pos == max_pos:
+ pos = 0
+ yield expr[start: idx - max_pos]
+ start = idx + 1
+ splits += 1
+ if max_split and splits >= max_split:
+ break
+ else:
+ pos += 1
+ else:
+ pos = 0
+ yield expr[start:]
+
+ @staticmethod
+ def _seperate_at_paren(expr, delim):
+ seperated = list(JSInterpreter._seperate(expr, delim, 1))
+ if len(seperated) < 2:
+ raise ExtractorError(f'No terminating paren {delim} in {expr}')
+ return seperated[0][1:].strip(), seperated[1].strip()
def interpret_statement(self, stmt, local_vars, allow_recursion=100):
if allow_recursion < 0:
raise ExtractorError('Recursion limit reached')
+ sub_statements = list(self._seperate(stmt, ';'))
+ stmt = (sub_statements or ['']).pop()
+ for sub_stmt in sub_statements:
+ ret, should_abort = self.interpret_statement(sub_stmt, local_vars, allow_recursion - 1)
+ if should_abort:
+ return ret
+
should_abort = False
stmt = stmt.lstrip()
stmt_m = re.match(r'var\s', stmt)
if expr == '': # Empty expression
return None
+ if expr.startswith('{'):
+ inner, outer = self._seperate_at_paren(expr, '}')
+ inner, should_abort = self.interpret_statement(inner, local_vars, allow_recursion - 1)
+ if not outer or should_abort:
+ return inner
+ else:
+ expr = json.dumps(inner) + outer
+
if expr.startswith('('):
- parens_count = 0
- for m in re.finditer(r'[()]', expr):
- if m.group(0) == '(':
- parens_count += 1
+ inner, outer = self._seperate_at_paren(expr, ')')
+ inner = self.interpret_expression(inner, local_vars, allow_recursion)
+ if not outer:
+ return inner
+ else:
+ expr = json.dumps(inner) + outer
+
+ if expr.startswith('['):
+ inner, outer = self._seperate_at_paren(expr, ']')
+ name = self._named_object(local_vars, [
+ self.interpret_expression(item, local_vars, allow_recursion)
+ for item in self._seperate(inner)])
+ expr = name + outer
+
+ m = re.match(r'try\s*', expr)
+ if m:
+ if expr[m.end()] == '{':
+ try_expr, expr = self._seperate_at_paren(expr[m.end():], '}')
+ else:
+ try_expr, expr = expr[m.end() - 1:], ''
+ ret, should_abort = self.interpret_statement(try_expr, local_vars, allow_recursion - 1)
+ if should_abort:
+ return ret
+ return self.interpret_statement(expr, local_vars, allow_recursion - 1)[0]
+
+ m = re.match(r'catch\s*\(', expr)
+ if m:
+ # We ignore the catch block
+ _, expr = self._seperate_at_paren(expr, '}')
+ return self.interpret_statement(expr, local_vars, allow_recursion - 1)[0]
+
+ m = re.match(r'for\s*\(', expr)
+ if m:
+ constructor, remaining = self._seperate_at_paren(expr[m.end() - 1:], ')')
+ if remaining.startswith('{'):
+ body, expr = self._seperate_at_paren(remaining, '}')
+ else:
+ m = re.match(r'switch\s*\(', remaining) # FIXME
+ if m:
+ switch_val, remaining = self._seperate_at_paren(remaining[m.end() - 1:], ')')
+ body, expr = self._seperate_at_paren(remaining, '}')
+ body = 'switch(%s){%s}' % (switch_val, body)
else:
- parens_count -= 1
- if parens_count == 0:
- sub_expr = expr[1:m.start()]
- sub_result = self.interpret_expression(
- sub_expr, local_vars, allow_recursion)
- remaining_expr = expr[m.end():].strip()
- if not remaining_expr:
- return sub_result
- else:
- expr = json.dumps(sub_result) + remaining_expr
+ body, expr = remaining, ''
+ start, cndn, increment = self._seperate(constructor, ';')
+ if self.interpret_statement(start, local_vars, allow_recursion - 1)[1]:
+ raise ExtractorError(
+ f'Premature return in the initialization of a for loop in {constructor!r}')
+ while True:
+ if not self.interpret_expression(cndn, local_vars, allow_recursion):
+ break
+ try:
+ ret, should_abort = self.interpret_statement(body, local_vars, allow_recursion - 1)
+ if should_abort:
+ return ret
+ except JS_Break:
+ break
+ except JS_Continue:
+ pass
+ if self.interpret_statement(increment, local_vars, allow_recursion - 1)[1]:
+ raise ExtractorError(
+ f'Premature return in the initialization of a for loop in {constructor!r}')
+ return self.interpret_statement(expr, local_vars, allow_recursion - 1)[0]
+
+ m = re.match(r'switch\s*\(', expr)
+ if m:
+ switch_val, remaining = self._seperate_at_paren(expr[m.end() - 1:], ')')
+ switch_val = self.interpret_expression(switch_val, local_vars, allow_recursion)
+ body, expr = self._seperate_at_paren(remaining, '}')
+ body, default = body.split('default:') if 'default:' in body else (body, None)
+ items = body.split('case ')[1:]
+ if default:
+ items.append(f'default:{default}')
+ matched = False
+ for item in items:
+ case, stmt = [i.strip() for i in self._seperate(item, ':', 1)]
+ matched = matched or case == 'default' or switch_val == self.interpret_expression(case, local_vars, allow_recursion)
+ if matched:
+ try:
+ ret, should_abort = self.interpret_statement(stmt, local_vars, allow_recursion - 1)
+ if should_abort:
+ return ret
+ except JS_Break:
break
- else:
- raise ExtractorError('Premature end of parens in %r' % expr)
+ return self.interpret_statement(expr, local_vars, allow_recursion - 1)[0]
+
+ # Comma seperated statements
+ sub_expressions = list(self._seperate(expr))
+ expr = sub_expressions.pop().strip() if sub_expressions else ''
+ for sub_expr in sub_expressions:
+ self.interpret_expression(sub_expr, local_vars, allow_recursion)
+
+ for m in re.finditer(rf'''(?x)
+ (?P<pre_sign>\+\+|--)(?P<var1>{_NAME_RE})|
+ (?P<var2>{_NAME_RE})(?P<post_sign>\+\+|--)''', expr):
+ var = m.group('var1') or m.group('var2')
+ start, end = m.span()
+ sign = m.group('pre_sign') or m.group('post_sign')
+ ret = local_vars[var]
+ local_vars[var] += 1 if sign[0] == '+' else -1
+ if m.group('pre_sign'):
+ ret = local_vars[var]
+ expr = expr[:start] + json.dumps(ret) + expr[end:]
for op, opfunc in _ASSIGN_OPERATORS:
m = re.match(r'''(?x)
(?P<expr>.*)$''' % (_NAME_RE, re.escape(op)), expr)
if not m:
continue
- right_val = self.interpret_expression(
- m.group('expr'), local_vars, allow_recursion - 1)
+ right_val = self.interpret_expression(m.group('expr'), local_vars, allow_recursion)
if m.groupdict().get('index'):
lvar = local_vars[m.group('out')]
- idx = self.interpret_expression(
- m.group('index'), local_vars, allow_recursion)
- assert isinstance(idx, int)
+ idx = self.interpret_expression(m.group('index'), local_vars, allow_recursion)
+ if not isinstance(idx, int):
+ raise ExtractorError(f'List indices must be integers: {idx}')
cur = lvar[idx]
val = opfunc(cur, right_val)
lvar[idx] = val
if expr.isdigit():
return int(expr)
+ if expr == 'break':
+ raise JS_Break()
+ elif expr == 'continue':
+ raise JS_Continue()
+
var_m = re.match(
- r'(?!if|return|true|false)(?P<name>%s)$' % _NAME_RE,
+ r'(?!if|return|true|false|null)(?P<name>%s)$' % _NAME_RE,
expr)
if var_m:
return local_vars[var_m.group('name')]
r'(?P<in>%s)\[(?P<idx>.+)\]$' % _NAME_RE, expr)
if m:
val = local_vars[m.group('in')]
- idx = self.interpret_expression(
- m.group('idx'), local_vars, allow_recursion - 1)
+ idx = self.interpret_expression(m.group('idx'), local_vars, allow_recursion)
return val[idx]
+ for op, opfunc in _OPERATORS:
+ seperated = list(self._seperate(expr, op))
+ if len(seperated) < 2:
+ continue
+ right_val = seperated.pop()
+ left_val = op.join(seperated)
+ left_val, should_abort = self.interpret_statement(
+ left_val, local_vars, allow_recursion - 1)
+ if should_abort:
+ raise ExtractorError(f'Premature left-side return of {op} in {expr!r}')
+ right_val, should_abort = self.interpret_statement(
+ right_val, local_vars, allow_recursion - 1)
+ if should_abort:
+ raise ExtractorError(f'Premature right-side return of {op} in {expr!r}')
+ return opfunc(left_val or 0, right_val)
+
m = re.match(
- r'(?P<var>%s)(?:\.(?P<member>[^(]+)|\[(?P<member2>[^]]+)\])\s*(?:\(+(?P<args>[^()]*)\))?$' % _NAME_RE,
+ r'(?P<var>%s)(?:\.(?P<member>[^(]+)|\[(?P<member2>[^]]+)\])\s*' % _NAME_RE,
expr)
if m:
variable = m.group('var')
member = remove_quotes(m.group('member') or m.group('member2'))
- arg_str = m.group('args')
-
- if variable in local_vars:
- obj = local_vars[variable]
- else:
- if variable not in self._objects:
- self._objects[variable] = self.extract_object(variable)
- obj = self._objects[variable]
-
- if arg_str is None:
- # Member access
- if member == 'length':
- return len(obj)
- return obj[member]
-
- assert expr.endswith(')')
- # Function call
- if arg_str == '':
- argvals = tuple()
+ arg_str = expr[m.end():]
+ if arg_str.startswith('('):
+ arg_str, remaining = self._seperate_at_paren(arg_str, ')')
else:
- argvals = tuple([
+ arg_str, remaining = None, arg_str
+
+ def assertion(cndn, msg):
+ """ assert, but without risk of getting optimized out """
+ if not cndn:
+ raise ExtractorError(f'{member} {msg}: {expr}')
+
+ def eval_method():
+ nonlocal member
+ if variable == 'String':
+ obj = str
+ elif variable in local_vars:
+ obj = local_vars[variable]
+ else:
+ if variable not in self._objects:
+ self._objects[variable] = self.extract_object(variable)
+ obj = self._objects[variable]
+
+ if arg_str is None:
+ # Member access
+ if member == 'length':
+ return len(obj)
+ return obj[member]
+
+ # Function call
+ argvals = [
self.interpret_expression(v, local_vars, allow_recursion)
- for v in arg_str.split(',')])
-
- if member == 'split':
- assert argvals == ('',)
- return list(obj)
- if member == 'join':
- assert len(argvals) == 1
- return argvals[0].join(obj)
- if member == 'reverse':
- assert len(argvals) == 0
- obj.reverse()
- return obj
- if member == 'slice':
- assert len(argvals) == 1
- return obj[argvals[0]:]
- if member == 'splice':
- assert isinstance(obj, list)
- index, howMany = argvals
- res = []
- for i in range(index, min(index + howMany, len(obj))):
- res.append(obj.pop(index))
- return res
-
- return obj[member](argvals)
-
- for op, opfunc in _OPERATORS:
- m = re.match(r'(?P<x>.+?)%s(?P<y>.+)' % re.escape(op), expr)
- if not m:
- continue
- x, abort = self.interpret_statement(
- m.group('x'), local_vars, allow_recursion - 1)
- if abort:
- raise ExtractorError(
- 'Premature left-side return of %s in %r' % (op, expr))
- y, abort = self.interpret_statement(
- m.group('y'), local_vars, allow_recursion - 1)
- if abort:
- raise ExtractorError(
- 'Premature right-side return of %s in %r' % (op, expr))
- return opfunc(x, y)
+ for v in self._seperate(arg_str)]
+
+ if obj == str:
+ if member == 'fromCharCode':
+ assertion(argvals, 'takes one or more arguments')
+ return ''.join(map(chr, argvals))
+ raise ExtractorError(f'Unsupported string method {member}')
+
+ if member == 'split':
+ assertion(argvals, 'takes one or more arguments')
+ assertion(argvals == [''], 'with arguments is not implemented')
+ return list(obj)
+ elif member == 'join':
+ assertion(isinstance(obj, list), 'must be applied on a list')
+ assertion(len(argvals) == 1, 'takes exactly one argument')
+ return argvals[0].join(obj)
+ elif member == 'reverse':
+ assertion(not argvals, 'does not take any arguments')
+ obj.reverse()
+ return obj
+ elif member == 'slice':
+ assertion(isinstance(obj, list), 'must be applied on a list')
+ assertion(len(argvals) == 1, 'takes exactly one argument')
+ return obj[argvals[0]:]
+ elif member == 'splice':
+ assertion(isinstance(obj, list), 'must be applied on a list')
+ assertion(argvals, 'takes one or more arguments')
+ index, howMany = (argvals + [len(obj)])[:2]
+ if index < 0:
+ index += len(obj)
+ add_items = argvals[2:]
+ res = []
+ for i in range(index, min(index + howMany, len(obj))):
+ res.append(obj.pop(index))
+ for i, item in enumerate(add_items):
+ obj.insert(index + i, item)
+ return res
+ elif member == 'unshift':
+ assertion(isinstance(obj, list), 'must be applied on a list')
+ assertion(argvals, 'takes one or more arguments')
+ for item in reversed(argvals):
+ obj.insert(0, item)
+ return obj
+ elif member == 'pop':
+ assertion(isinstance(obj, list), 'must be applied on a list')
+ assertion(not argvals, 'does not take any arguments')
+ if not obj:
+ return
+ return obj.pop()
+ elif member == 'push':
+ assertion(argvals, 'takes one or more arguments')
+ obj.extend(argvals)
+ return obj
+ elif member == 'forEach':
+ assertion(argvals, 'takes one or more arguments')
+ assertion(len(argvals) <= 2, 'takes at-most 2 arguments')
+ f, this = (argvals + [''])[:2]
+ return [f((item, idx, obj), this=this) for idx, item in enumerate(obj)]
+ elif member == 'indexOf':
+ assertion(argvals, 'takes one or more arguments')
+ assertion(len(argvals) <= 2, 'takes at-most 2 arguments')
+ idx, start = (argvals + [0])[:2]
+ try:
+ return obj.index(idx, start)
+ except ValueError:
+ return -1
+
+ if isinstance(obj, list):
+ member = int(member)
+ return obj[member](argvals)
+
+ if remaining:
+ return self.interpret_expression(
+ self._named_object(local_vars, eval_method()) + remaining,
+ local_vars, allow_recursion)
+ else:
+ return eval_method()
- m = re.match(
- r'^(?P<func>%s)\((?P<args>[a-zA-Z0-9_$,]*)\)$' % _NAME_RE, expr)
+ m = re.match(r'^(?P<func>%s)\((?P<args>[a-zA-Z0-9_$,]*)\)$' % _NAME_RE, expr)
if m:
fname = m.group('func')
argvals = tuple([
int(v) if v.isdigit() else local_vars[v]
- for v in m.group('args').split(',')]) if len(m.group('args')) > 0 else tuple()
- if fname not in self._functions:
+ for v in self._seperate(m.group('args'))])
+ if fname in local_vars:
+ return local_vars[fname](argvals)
+ elif fname not in self._functions:
self._functions[fname] = self.extract_function(fname)
return self._functions[fname](argvals)
- raise ExtractorError('Unsupported JS expression %r' % expr)
+ if expr:
+ raise ExtractorError('Unsupported JS expression %r' % expr)
def extract_object(self, objname):
_FUNC_NAME_RE = r'''(?:[a-zA-Z$0-9]+|"[a-zA-Z$0-9]+"|'[a-zA-Z$0-9]+')'''
return obj
- def extract_function(self, funcname):
+ def extract_function_code(self, funcname):
+ """ @returns argnames, code """
func_m = re.search(
r'''(?x)
(?:function\s+%s|[{;,]\s*%s\s*=\s*function|var\s+%s\s*=\s*function)\s*
\((?P<args>[^)]*)\)\s*
- \{(?P<code>[^}]+)\}''' % (
+ (?P<code>\{(?:(?!};)[^"]|"([^"]|\\")*")+\})''' % (
re.escape(funcname), re.escape(funcname), re.escape(funcname)),
self.code)
+ code, _ = self._seperate_at_paren(func_m.group('code'), '}') # refine the match
if func_m is None:
raise ExtractorError('Could not find JS function %r' % funcname)
- argnames = func_m.group('args').split(',')
+ return func_m.group('args').split(','), code
- return self.build_function(argnames, func_m.group('code'))
+ def extract_function(self, funcname):
+ return self.extract_function_from_code(*self.extract_function_code(funcname))
+
+ def extract_function_from_code(self, argnames, code, *global_stack):
+ local_vars = {}
+ while True:
+ mobj = re.search(r'function\((?P<args>[^)]*)\)\s*{', code)
+ if mobj is None:
+ break
+ start, body_start = mobj.span()
+ body, remaining = self._seperate_at_paren(code[body_start - 1:], '}')
+ name = self._named_object(
+ local_vars,
+ self.extract_function_from_code(
+ [str.strip(x) for x in mobj.group('args').split(',')],
+ body, local_vars, *global_stack))
+ code = code[:start] + name + remaining
+ return self.build_function(argnames, code, local_vars, *global_stack)
def call_function(self, funcname, *args):
- f = self.extract_function(funcname)
- return f(args)
-
- def build_function(self, argnames, code):
- def resf(args):
- local_vars = dict(zip(argnames, args))
- for stmt in code.split(';'):
- res, abort = self.interpret_statement(stmt, local_vars)
- if abort:
+ return self.extract_function(funcname)(args)
+
+ def build_function(self, argnames, code, *global_stack):
+ global_stack = list(global_stack) or [{}]
+ local_vars = global_stack.pop(0)
+
+ def resf(args, **kwargs):
+ local_vars.update({
+ **dict(zip(argnames, args)),
+ **kwargs
+ })
+ var_stack = LocalNameSpace(local_vars, *global_stack)
+ for stmt in self._seperate(code.replace('\n', ''), ';'):
+ ret, should_abort = self.interpret_statement(stmt, var_stack)
+ if should_abort:
break
- return res
+ return ret
return resf