X-Git-Url: https://jfr.im/git/erebus.git/blobdiff_plain/99366200d5afdc1fa53fde0ee8533429af937f9e..3cec5bdc665d310ac628e80b007dd2d71ae7d7bd:/modules/urls.py?ds=sidebyside diff --git a/modules/urls.py b/modules/urls.py index 2908dc0..65f9ca1 100644 --- a/modules/urls.py +++ b/modules/urls.py @@ -1,4 +1,5 @@ -# Erebus IRC bot - Author: Erebus Team +# Erebus IRC bot - Author: Conny Sjoblom +# vim: fileencoding=utf-8 # URL Checker # This file is released into the public domain; see http://unlicense.org/ @@ -6,8 +7,9 @@ modinfo = { 'author': 'Erebus Team', 'license': 'public domain', - 'compatible': [1], # compatible module API versions - 'depends': [], # other modules required to work properly? + 'compatible': [0], + 'depends': [], + 'softdeps': [], } # http://embed.ly/tools/generator @@ -19,21 +21,39 @@ modstart = lib.modstart modstop = lib.modstop # module code -import re, json, urllib2, urlparse, HTMLParser -from BeautifulSoup import BeautifulSoup +import sys +if sys.version_info.major < 3: + stringbase = basestring + import urllib2 + import urlparse + import HTMLParser + html = HTMLParser.HTMLParser() + from BeautifulSoup import BeautifulSoup +else: + stringbase = str + import urllib.request as urllib2 + import urllib.parse as urlparse + import html + from bs4 import BeautifulSoup +import http.client -hostmask_regex = re.compile(r'^(.*)!(.*)@(.*)$') +import re, json, datetime -spotify_regex = ( - re.compile(r'spotify:(?P\w+):(?P\w{22})'), - re.compile(r'http://open.spotify.com/(?P\w+)/(?P\w{22})') -) -youtube_regex = ( - re.compile(r'https?://(?:www\.)?youtube\.com/watch\?[a-zA-Z0-9=&_\-]+'), -) -twitch_regex = ( - re.compile('(http|ftp|https):\/\/([\w\-_]+(?:(?:\.[\w\-_]+)+))([\w\-\.,@?^=%&:/~\+#]*[\w\-\@?^=%&/~\+#])?'), #TODO -) +try: + import aia + aia_session = aia.AIASession() + # aia is broken on capath systems, needs cafile to work + aia_session._context.load_verify_locations(cafile='/etc/ssl/certs/ca-certificates.crt') + aia_session._trusted = { + aia.openssl_get_cert_info(ca_der)["subject"]: ca_der + for ca_der in aia_session._context.get_ca_certs(True) + } + print("aia loaded") +except ImportError as e: + print(repr(e)) + aia = None + +hostmask_regex = re.compile(r'^(.*)!(.*)@(.*)$') def parser_hostmask(hostmask): if isinstance(hostmask, dict): @@ -59,39 +79,73 @@ def parser_hostmask(hostmask): 'host': host } -@lib.hooknum("PRIVMSG") -def privmsg_hook(bot, line): - sender = parser_hostmask(line[1:line.find(' ')]) +class SmartRedirectHandler(urllib2.HTTPRedirectHandler): + def http_error_301(self, req, fp, code, msg, headers): + result = urllib2.HTTPRedirectHandler.http_error_301( + self, req, fp, code, msg, headers) + result.status = code + return result - try: - linetx = line.split(None, 3)[3][1:] - except IndexError: - linetx = '' + def http_error_302(self, req, fp, code, msg, headers): + result = urllib2.HTTPRedirectHandler.http_error_302( + self, req, fp, code, msg, headers) + result.status = code + return result + +def _get_blocked_chans(): + return lib.parent.cfg.get('urls', 'blocked', '').split(',') - chan = line.split()[2] +def process_line(line): + responses = [] + num_found = 0 + limit = lib.parent.cfg.getint('urls', 'limit', 2) + for action, group in regexes: + for regex in group: + for match in regex.findall(line): + if match: + num_found += 1 + if num_found > limit: + return responses + if isinstance(match, stringbase): + resp = action(match) + else: + resp = action(*match) + if resp is not None and resp != "": + responses.append(resp) + return responses - if 'open.spotify.com' in line or 'spotify:' in line: - for r in spotify_regex: - for sptype, track in r.findall(linetx): - bot.msg(chan, gotspotify(sptype, track)) +@lib.hooknum("PRIVMSG") +def privmsg_hook(bot, textline): + user = parser_hostmask(textline[1:textline.find(' ')]) + chan = textline.split()[2] + + if chan in _get_blocked_chans(): return - elif 'youtube.com' in line or 'youtu.be' in line: - print "got youtube!" - for r in youtube_regex: - for url in r.findall(linetx): - bot.msg(chan, gotyoutube(url)) + try: + line = textline.split(None, 3)[3][1:] + except IndexError: + line = '' - elif 'twitch.tv' in line: pass #TODO fix twitch + responses = process_line(line) + send_response(bot, chan, responses) - else: pass #TODO generic checker +def send_response(bot, chan, responses): + if len(responses) > 0: + if lib.parent.cfg.getboolean('urls', 'multiline'): + for r in responses: + bot.msg(chan, r, True) + else: + bot.msg(chan, ' | '.join(responses), True) +def unescape(line): + return re.sub('\s+', ' ', html.unescape(line)) def gotspotify(type, track): url = 'http://ws.spotify.com/lookup/1/?uri=spotify:%s:%s' % (type, track) xml = urllib2.urlopen(url).read() - soup = BeautifulSoup(xml) + soup = BeautifulSoup(xml, convertEntities=BeautifulSoup.HTML_ENTITIES) lookup_type = soup.contents[2].name - + if lookup_type == 'track': name = soup.find('name').string album_name = soup.find('album').find('name').string @@ -101,54 +155,214 @@ def gotspotify(type, track): popularity = float(popularity.string)*100 length = float(soup.find('length').string) minutes = int(length)/60 - seconds = int(length)%60 - - return 'Track: %s - %s / %s %s:%.2d %2d%%' % (artist_name, name, album_name, minutes, seconds, popularity) - + seconds = int(length)%60 + + return unescape('Track: %s - %s / %s %s:%.2d %2d%%' % (artist_name, name, album_name, minutes, seconds, popularity)) + elif lookup_type == 'album': album_name = soup.find('album').find('name').string artist_name = soup.find('artist').find('name').string released = soup.find('released').string - return 'Album: %s - %s - %s' % (artist_name, album_name, released) - + return unescape('Album: %s - %s - %s' % (artist_name, album_name, released)) + else: return 'Unsupported type.' +def _yt_duration(s): + mo = re.match(r'P(\d+D)?T(\d+H)?(\d+M)?(\d+S)?', s) + pcs = [x for x in mo.groups() if x] + return ''.join(pcs).lower() +def _yt_date(s, f): + mo = re.match(r'(\d{4})-(\d{2})-(\d{2})T(\d{2}):(\d{2}):(\d{2})\.(\d+)Z', s) + return datetime.datetime(*(int(x) for x in mo.groups())).strftime(f) +def _yt_round(n): + n = float(n) + if n >= 10**12: + return '%.1ft' % (n/10**12) + elif n >= 10**9: + return '%.1fb' % (n/10**9) + elif n >= 10**6: + return '%.1fm' % (n/10**6) + elif n >= 10**3: + return '%.1fk' % (n/10**3) + else: + return int(n) + def gotyoutube(url): url_data = urlparse.urlparse(url) query = urlparse.parse_qs(url_data.query) video = query["v"][0] - api_url = 'http://gdata.youtube.com/feeds/api/videos/%s?alt=json&v=2' % video + api_url = 'https://www.googleapis.com/youtube/v3/videos?part=snippet,contentDetails,statistics&id=%s&key=%s' % (video, lib.parent.cfg.get('urls', 'api_key')) try: respdata = urllib2.urlopen(api_url).read() - video_info = json.loads(respdata) + v = json.loads(respdata) + v = v['items'][0] - title = video_info['entry']['title']["$t"] - author = video_info['entry']['author'][0]['name']['$t'] + return unescape(lib.parent.cfg.get('urls', 'yt_format', "\002%(author)s\002: \037%(title)s\037 [%(duration)s, uploaded %(uploaded)s, %(views)s v/%(likes)s l/%(dislikes)s d]") % { + 'title': v['snippet']['title'], + 'author': v['snippet']['channelTitle'], + 'duration': _yt_duration(v['contentDetails']['duration']), + 'uploaded': _yt_date(v['snippet']['publishedAt'], lib.parent.cfg.get('urls', 'yt_date_format', '%b %d %Y')), + 'views': _yt_round(v['statistics']['viewCount']), + 'likes': _yt_round(v['statistics']['likeCount']), + 'dislikes': _yt_round(v['statistics']['dislikeCount']), + }) + except urllib2.HTTPError as e: + if e.getcode() == 403: + return 'API limit exceeded' + else: + return str(e) + except IndexError: + return 'no results' + except Exception as e: + return str(e) - return "Youtube: %s (%s)" % (title, author) +def gottwitch(uri): + url = 'https://api.twitch.tv/helix/streams?user_login=%s' % uri.split('/')[0] + opener = urllib2.build_opener() + opener.addheaders = [('Client-ID', lib.parent.cfg.get('urls', 'twitch_api_key'))] + respdata = opener.open(url).read() + twitch = json.loads(respdata)['data'] + try: + # TODO: add current game. + return unescape('\037%s\037 is %s (%s)' % (twitch[0]['user_name'], twitch[0]['type'], twitch[0]['title'])) except: - pass + return 'Channel offline.' +def _humanize_bytes(b): + b = int(b) + i = 0 + table = " kMGTPEZYRQ" + while b > 1024: + i += 1 + b /= 1024.0 + if i == 0: + return "%dB" % (b) + else: + return "%.2f%siB" % (b, table[i]) -def gottwitch(url): - return "" - #FIXME: +def _do_request(url, try_aia=False): + """ + Return value is a tuple consisting of: + - the HTTPResponse object, or a string on error. Empty string -> no response. + - and a flag indicating whether AIA was used + """ try: - linetx = line.split(None, 3)[3][1:] - except IndexError: - linetx = '' + request = urllib2.Request(url, headers={ + 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', + 'accept-language': 'en-US,en;q=0.9', + 'cache-control': 'max-age=0', + 'sec-ch-ua': '"Chromium";v="122", "Not(A:Brand";v="24", "Google Chrome";v="122"', + 'sec-ch-ua-mobile': '?0', + 'sec-ch-ua-platform': '"Linux"', + 'sec-fetch-dest': 'document', + 'sec-fetch-mode': 'navigate', + 'sec-fetch-site': 'none', + 'sec-fetch-user': '?1', + 'upgrade-insecure-requests': '1', + 'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36' + }) + except ValueError: + return '', False + if try_aia: + try: + opener = urllib2.build_opener(urllib2.HTTPSHandler(context=aia_session.ssl_context_from_url(url)), SmartRedirectHandler()) + except aia.AIAError as e: + return 'Request error: %s.%s: %s' % (e.__module__, e.__class__.__name__, e.args[0]), True + else: + opener = urllib2.build_opener(SmartRedirectHandler()) + + # Send request and handle errors + try: + response = opener.open(request, timeout=2) + except http.client.InvalidURL as e: # why does a method under urllib.request raise an exception under http.client??? + return '', False + except urllib2.HTTPError as e: + return 'Request error: %s %s' % (e.code, e.reason), False + except urllib2.URLError as e: + if "certificate verify failed: unable to get local issuer certificate" in str(e.reason): + if aia: # Retry with AIA enabled, if module is present + return _do_request(url, True) + else: + lib.parent.log('urls', '?', 'If the site is not serving the certificate chain, installing the aia library might make this request work: pip install aia') + return 'Request error: site may have broken TLS configuration (%s)' % (e.reason), False + else: + return 'Request error: %s' % (e.reason), False + except TimeoutError as e: + return 'Request error: request timed out', False + except Exception as e: + return 'Unknown error: %s %r' % (type(e).__name__, e.args), False + + return response, try_aia + - if checkfor not in line: - return # doesn't concern us +def goturl(url): + output = [] + for _, group in other_regexes: + for regex in group: + if regex.match(url): + return None - for p, h, c in url_regex.findall(linetx): - if checkfor in h: - url = 'http://api.justin.tv/api/stream/list.json?channel=%s' % c[1:] - respdata = urllib2.urlopen(url).read() - twitch = json.loads(respdata) + response, used_aia = _do_request(url) + if isinstance(response, stringbase): + return response + + # Try to add type and length headers to reply + c_type_fields = response.getheader('Content-Type', '').split(';') + c_type = c_type_fields.pop(0) + c_charset = None + for f in c_type_fields: + f = f.strip() + if len(f) > 8 and f[0:8] == 'charset=': + c_charset = f[8:] + c_len = response.getheader('Content-Length') + if c_type != '': + output.append("[%s] " % (c_type)) + else: + output.append("[no type] ") + if c_type != "text/html": # else length will be provided by HTML code below + if c_len is not None: + output.append("[%s] " % (_humanize_bytes(c_len))) + else: + output.append("[no length] ") + + if used_aia: + output.append("[AIA] ") + + # Try to add title if HTML + if c_type == 'text/html': + try: + responsebody = response.read(1024*1024) + except Exception as e: + output.append('Error reading response body: %s %r' % (type(e).__name__, e.args)) + else: + if c_len is not None and len(responsebody) != int(c_len): # did we read a different amount than Content-Length? + if response.read(1): # there's more data, we just aren't reading it + output.append("[read %s; Content-Length %s] " % (_humanize_bytes(len(responsebody)), _humanize_bytes(c_len))) + else: + output.append("[actual %s; Content-Length %s] " % (_humanize_bytes(len(responsebody)), _humanize_bytes(c_len))) + else: # Content-Length = amount read + output.append("[%s] " % (_humanize_bytes(len(responsebody)))) try: - bot.msg(line.split()[2], 'Twitch: %s (%s playing %s)' % (twitch[0]['channel']['status'], twitch[0]['channel']['login'], twitch[0]['channel']['meta_game'])) - except: - bot.msg(line.split()[2], 'Twitch: Channel offline.') + soup = BeautifulSoup(responsebody, from_encoding=c_charset) + if soup.title: + output.append('Title: ' + unescape('%s' % (soup.find('title').string.strip()))) + else: + output.append('No title') + except Exception as e: + output.append('Title error: %s %r ' % (type(e).__name__, e.args)) + + return ''.join(output) +url_regex = ( + re.compile(r'https?://(?:[^/\s.]+\.)+[a-z0-9-]+(?::\d{1,5})?(?:/[^\s\]>)}]+)?', re.I), +) +other_regexes = ( + (lambda x: '', (re.compile(r"""https?://(?:www\.)?(?:twitter|x)\.com/""", re.I),)), # skip twitter + (lambda x: '', (re.compile(r"""https?://(?:www\.)?reddit\.com/""", re.I),)), # skip new-reddit + (lambda x: '', (re.compile(r"""https?://jfr\.im/git/""", re.I),)), # skip my gitweb + (lambda x: '', (re.compile(r"""https?://(?:www\.)?wunderground\.com/""", re.I),)), # skip wunderground, they time us out +) +regexes = other_regexes + ( + (goturl, url_regex), +)