X-Git-Url: https://jfr.im/git/erebus.git/blobdiff_plain/800d414de4180864f9a4105bbaac0b8b93209744..3c63ef0547be2798c4ed4f55ad9ee7d5e14eb013:/modules/urls.py diff --git a/modules/urls.py b/modules/urls.py index 965c8b5..4935d68 100644 --- a/modules/urls.py +++ b/modules/urls.py @@ -1,4 +1,5 @@ -# Erebus IRC bot - Author: Erebus Team +# Erebus IRC bot - Author: Conny Sjoblom +# vim: fileencoding=utf-8 # URL Checker # This file is released into the public domain; see http://unlicense.org/ @@ -6,8 +7,9 @@ modinfo = { 'author': 'Erebus Team', 'license': 'public domain', - 'compatible': [1], # compatible module API versions - 'depends': [], # other modules required to work properly? + 'compatible': [0], + 'depends': [], + 'softdeps': [], } # http://embed.ly/tools/generator @@ -19,23 +21,38 @@ modstart = lib.modstart modstop = lib.modstop # module code -import re, urllib2, urlparse, json, HTMLParser -from BeautifulSoup import BeautifulSoup +import sys +if sys.version_info.major < 3: + stringbase = basestring + import urllib2 + import urlparse + import HTMLParser + html = HTMLParser.HTMLParser() + from BeautifulSoup import BeautifulSoup +else: + stringbase = str + import urllib.request as urllib2 + import urllib.parse as urlparse + import html + from bs4 import BeautifulSoup -html_parser = HTMLParser.HTMLParser() +import re, json, datetime + +try: + import aia + aia_session = aia.AIASession() + # aia is broken on capath systems, needs cafile to work + aia_session._context.load_verify_locations(cafile='/etc/ssl/certs/ca-certificates.crt') + aia_session._trusted = { + aia.openssl_get_cert_info(ca_der)["subject"]: ca_der + for ca_der in aia_session._context.get_ca_certs(True) + } + print("aia loaded") +except ImportError as e: + print(repr(e)) + aia = None hostmask_regex = re.compile(r'^(.*)!(.*)@(.*)$') -url_regex = re.compile(r'((?:https?://|spotify:)[^\s]+)') -spotify_regex = ( - re.compile(r'spotify:(?P\w+):(?P\w{22})'), - re.compile(r'https?://open.spotify.com/(?P\w+)/(?P\w{22})') -) -youtube_regex = ( - re.compile(r'https?://(?:www\.)?youtube\.com/watch\?[a-zA-Z0-9=&_\-]+'), -) -twitch_regex = ( - re.compile(r'https?://(?:www\.)?twitch.tv/(.*)\w{1,}'), -) def parser_hostmask(hostmask): if isinstance(hostmask, dict): @@ -61,39 +78,63 @@ def parser_hostmask(hostmask): 'host': host } +class SmartRedirectHandler(urllib2.HTTPRedirectHandler): + def http_error_301(self, req, fp, code, msg, headers): + result = urllib2.HTTPRedirectHandler.http_error_301( + self, req, fp, code, msg, headers) + result.status = code + return result + + def http_error_302(self, req, fp, code, msg, headers): + result = urllib2.HTTPRedirectHandler.http_error_302( + self, req, fp, code, msg, headers) + result.status = code + return result + +def _get_blocked_chans(): + return lib.parent.cfg.get('urls', 'blocked', '').split(',') + +def process_line(line): + responses = [] + num_found = 0 + limit = lib.parent.cfg.getint('urls', 'limit', 2) + for action, group in regexes: + for regex in group: + for match in regex.findall(line): + if match: + num_found += 1 + if num_found > limit: + return responses + if isinstance(match, stringbase): + resp = action(match) + else: + resp = action(*match) + if resp is not None and resp != "": + responses.append(resp) + return responses + @lib.hooknum("PRIVMSG") def privmsg_hook(bot, textline): user = parser_hostmask(textline[1:textline.find(' ')]) chan = textline.split()[2] + if chan in _get_blocked_chans(): return + try: line = textline.split(None, 3)[3][1:] except IndexError: line = '' - for match in url_regex.findall(line): - if match: - print match - if 'open.spotify.com' in match or 'spotify:' in match: - for r in spotify_regex: - for sptype, track in r.findall(match): - bot.msg(chan, unescape(gotspotify(sptype, track))) - - elif 'youtube.com' in match or 'youtu.be' in match: - for r in youtube_regex: - for url in r.findall(match): - bot.msg(chan, unescape(gotyoutube(url))) - - elif 'twitch.tv' in match: - for r in twitch_regex: - for uri in r.findall(match): - bot.msg(chan, unescape(gottwitch(uri))) - - else: - bot.msg(chan, unescape(goturl(match))) + responses = process_line(line) + if len(responses) > 0: + if lib.parent.cfg.getboolean('urls', 'multiline'): + for r in responses: + bot.msg(chan, r, True) + else: + bot.msg(chan, ' | '.join(responses), True) def unescape(line): - return html_parser.unescape(line) + return re.sub('\s+', ' ', html.unescape(line)) def gotspotify(type, track): url = 'http://ws.spotify.com/lookup/1/?uri=spotify:%s:%s' % (type, track) @@ -110,47 +151,174 @@ def gotspotify(type, track): popularity = float(popularity.string)*100 length = float(soup.find('length').string) minutes = int(length)/60 - seconds = int(length)%60 + seconds = int(length)%60 - return 'Track: %s - %s / %s %s:%.2d %2d%%' % (artist_name, name, album_name, minutes, seconds, popularity) + return unescape('Track: %s - %s / %s %s:%.2d %2d%%' % (artist_name, name, album_name, minutes, seconds, popularity)) elif lookup_type == 'album': album_name = soup.find('album').find('name').string artist_name = soup.find('artist').find('name').string released = soup.find('released').string - return 'Album: %s - %s - %s' % (artist_name, album_name, released) + return unescape('Album: %s - %s - %s' % (artist_name, album_name, released)) else: return 'Unsupported type.' +def _yt_duration(s): + mo = re.match(r'P(\d+D)?T(\d+H)?(\d+M)?(\d+S)?', s) + pcs = [x for x in mo.groups() if x] + return ''.join(pcs).lower() +def _yt_date(s, f): + mo = re.match(r'(\d{4})-(\d{2})-(\d{2})T(\d{2}):(\d{2}):(\d{2})\.(\d+)Z', s) + return datetime.datetime(*(int(x) for x in mo.groups())).strftime(f) +def _yt_round(n): + n = float(n) + if n >= 10**12: + return '%.1ft' % (n/10**12) + elif n >= 10**9: + return '%.1fb' % (n/10**9) + elif n >= 10**6: + return '%.1fm' % (n/10**6) + elif n >= 10**3: + return '%.1fk' % (n/10**3) + else: + return int(n) + def gotyoutube(url): url_data = urlparse.urlparse(url) query = urlparse.parse_qs(url_data.query) video = query["v"][0] - api_url = 'http://gdata.youtube.com/feeds/api/videos/%s?alt=json&v=2' % video + api_url = 'https://www.googleapis.com/youtube/v3/videos?part=snippet,contentDetails,statistics&id=%s&key=%s' % (video, lib.parent.cfg.get('urls', 'api_key')) try: respdata = urllib2.urlopen(api_url).read() - video_info = json.loads(respdata) + v = json.loads(respdata) + v = v['items'][0] - title = video_info['entry']['title']["$t"] - author = video_info['entry']['author'][0]['name']['$t'] + return unescape(lib.parent.cfg.get('urls', 'yt_format', "\002%(author)s\002: \037%(title)s\037 [%(duration)s, uploaded %(uploaded)s, %(views)s v/%(likes)s l/%(dislikes)s d]") % { + 'title': v['snippet']['title'], + 'author': v['snippet']['channelTitle'], + 'duration': _yt_duration(v['contentDetails']['duration']), + 'uploaded': _yt_date(v['snippet']['publishedAt'], lib.parent.cfg.get('urls', 'yt_date_format', '%b %d %Y')), + 'views': _yt_round(v['statistics']['viewCount']), + 'likes': _yt_round(v['statistics']['likeCount']), + 'dislikes': _yt_round(v['statistics']['dislikeCount']), + }) + except urllib2.HTTPError as e: + if e.getcode() == 403: + return 'API limit exceeded' + else: + return str(e) + except IndexError: + return 'no results' + except Exception as e: + return str(e) - return "Youtube: %s (%s)" % (title, author) +def gottwitch(uri): + url = 'https://api.twitch.tv/helix/streams?user_login=%s' % uri.split('/')[0] + opener = urllib2.build_opener() + opener.addheaders = [('Client-ID', lib.parent.cfg.get('urls', 'twitch_api_key'))] + respdata = opener.open(url).read() + twitch = json.loads(respdata)['data'] + try: + # TODO: add current game. + return unescape('\037%s\037 is %s (%s)' % (twitch[0]['user_name'], twitch[0]['type'], twitch[0]['title'])) except: - pass + return 'Channel offline.' -def gottwitch(uri): - url = 'http://api.justin.tv/api/stream/list.json?channel=%s' % uri.split('/')[0] - respdata = urllib2.urlopen(url).read() - twitch = json.loads(respdata) - try: - return 'Twitch: %s (%s playing %s)' % (twitch[0]['channel']['status'], twitch[0]['channel']['login'], twitch[0]['channel']['meta_game']) - except: - return 'Twitch: Channel offline.' +def _humanize_bytes(b): + b = int(b) + i = 0 + table = " kMGTPEZYRQ" + while b > 1024: + i += 1 + b /= 1024.0 + if i == 0: + return "%dB" % (b) + else: + return "%.2f%siB" % (b, table[i]) -def goturl(url): +def _do_request(url, try_aia=False): + """Returns the HTTPResponse object, or a string on error""" + request = urllib2.Request(url, headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36'}) + if try_aia: + opener = urllib2.build_opener(urllib2.HTTPSHandler(context=aia_session.ssl_context_from_url(url)), SmartRedirectHandler()) + else: + opener = urllib2.build_opener(SmartRedirectHandler()) + + # Send request and handle errors try: - soup = BeautifulSoup(urllib2.urlopen(url, timeout=2)) - return "Title: %s" % soup.title.string - except: - return "Invalid URL/Timeout" + response = opener.open(request, timeout=2) + except urllib2.HTTPError as e: + return 'Request error: %s %s' % (e.code, e.reason) + except urllib2.URLError as e: + if "certificate verify failed: unable to get local issuer certificate" in str(e.reason): + if aia: # Retry with AIA enabled + return _do_request(url, True) + else: + lib.parent.log('urls', '?', 'If the site is not serving the certificate chain, installing the aia library might make this request work: pip install aia') + return 'Request error: site may have broken TLS configuration (%s)' % (e.reason) + else: + return 'Request error: %s' % (e.reason) + except TimeoutError as e: + return 'Request error: request timed out' + except Exception as e: + return 'Unknown error: %s %r' % (type(e).__name__, e.args) + + return response + + +def goturl(url): + output = [] + for _, group in other_regexes: + for regex in group: + if regex.match(url): + return None + + response = _do_request(url) + if isinstance(response, stringbase): + return response + + # Try to add type and length headers to reply + c_type = response.getheader('Content-Type', '').split(';', 1)[0] + c_len = response.getheader('Content-Length') + if c_type != '': + output.append("[%s] " % (c_type)) + else: + output.append("[no type] ") + if c_type != "text/html": # else length will be provided by HTML code below + if c_len is not None: + output.append("[%s] " % (_humanize_bytes(c_len))) + else: + output.append("[no length] ") + + # Try to add title if HTML + if c_type == 'text/html': + try: + responsebody = response.read(1024*1024) + except Exception as e: + output.append('Error reading response body: %s %r' % (type(e).__name__, e.args)) + else: + if c_len is not None and len(responsebody) != int(c_len): + output.append("[actual %s; Content-Length %s] " % (_humanize_bytes(len(responsebody)), _humanize_bytes(c_len))) + else: + output.append("[%s] " % (_humanize_bytes(len(responsebody)))) + try: + soup = BeautifulSoup(responsebody) + if soup.title: + output.append('Title: ' + unescape('%s' % (soup.find('title').string.strip()))) + else: + output.append('No title') + except Exception as e: + output.append('Title error: %s %r ' % (type(e).__name__, e.args)) + + return ''.join(output) + +url_regex = ( + re.compile(r'https?://(?:[^/\s.]+\.)+[^/\s.]+(?:/\S+)?'), +) +other_regexes = ( + (lambda x: '', (re.compile(r"""https?://(?:www\.)?(?:twitter|x)\.com/""", re.I),)), # skip twitter +) +regexes = other_regexes + ( + (goturl, url_regex), +)