-# Erebus IRC bot - Author: Erebus Team
+# Erebus IRC bot - Author: Conny Sjoblom
+# vim: fileencoding=utf-8
# URL Checker
# This file is released into the public domain; see http://unlicense.org/
modstop = lib.modstop
# module code
-import re, urllib2, urlparse, json, HTMLParser
-from BeautifulSoup import BeautifulSoup
+import sys
+if sys.version_info.major < 3:
+ stringbase = basestring
+ import urllib2
+ import urlparse
+ import HTMLParser
+ html = HTMLParser.HTMLParser()
+ from BeautifulSoup import BeautifulSoup
+else:
+ stringbase = str
+ import urllib.request as urllib2
+ import urllib.parse as urlparse
+ import html
+ from bs4 import BeautifulSoup
+import http.client
-html_parser = HTMLParser.HTMLParser()
+import re, json, datetime
+
+try:
+ import aia
+ aia_session = aia.AIASession()
+ # aia is broken on capath systems, needs cafile to work
+ aia_session._context.load_verify_locations(cafile='/etc/ssl/certs/ca-certificates.crt')
+ aia_session._trusted = {
+ aia.openssl_get_cert_info(ca_der)["subject"]: ca_der
+ for ca_der in aia_session._context.get_ca_certs(True)
+ }
+ print("aia loaded")
+except ImportError as e:
+ print(repr(e))
+ aia = None
hostmask_regex = re.compile(r'^(.*)!(.*)@(.*)$')
-url_regex = re.compile(r'https?://[^/\s]+\.[^/\s]+(?:/\S+)?')
-spotify_regex = (
- re.compile(r'spotify:(?P<type>\w+):(?P<track_id>\w{22})'),
- re.compile(r'https?://open.spotify.com/(?P<type>\w+)/(?P<track_id>\w+)')
-)
-youtube_regex = (
- re.compile(r'https?://(?:www\.)?youtube\.com/watch\?[a-zA-Z0-9=&_\-]+'),
-)
-twitch_regex = (
- re.compile(r'https?:\/\/(?:www\.)?twitch.tv\/([A-Za-z0-9]*)'),
-)
def parser_hostmask(hostmask):
if isinstance(hostmask, dict):
result.status = code
return result
+def _get_blocked_chans():
+ return lib.parent.cfg.get('urls', 'blocked', '').split(',')
+
+def process_line(line):
+ responses = []
+ num_found = 0
+ limit = lib.parent.cfg.getint('urls', 'limit', 2)
+ for action, group in regexes:
+ for regex in group:
+ for match in regex.findall(line):
+ if match:
+ num_found += 1
+ if num_found > limit:
+ return responses
+ if isinstance(match, stringbase):
+ resp = action(match)
+ else:
+ resp = action(*match)
+ if resp is not None and resp != "":
+ responses.append(resp)
+ return responses
+
@lib.hooknum("PRIVMSG")
def privmsg_hook(bot, textline):
user = parser_hostmask(textline[1:textline.find(' ')])
chan = textline.split()[2]
+ if chan in _get_blocked_chans(): return
+
try:
line = textline.split(None, 3)[3][1:]
except IndexError:
line = ''
- for match in url_regex.findall(line):
- if match:
- response = goturl(match)
- if response is not None:
- bot.msg(chan, response)
+ responses = process_line(line)
+ send_response(bot, chan, responses)
+
+def send_response(bot, chan, responses):
+ if len(responses) > 0:
+ if lib.parent.cfg.getboolean('urls', 'multiline'):
+ for r in responses:
+ bot.msg(chan, r, True)
+ else:
+ bot.msg(chan, ' | '.join(responses), True)
def unescape(line):
- return html_parser.unescape(line)
+ return re.sub('\s+', ' ', html.unescape(line))
def gotspotify(type, track):
url = 'http://ws.spotify.com/lookup/1/?uri=spotify:%s:%s' % (type, track)
popularity = float(popularity.string)*100
length = float(soup.find('length').string)
minutes = int(length)/60
- seconds = int(length)%60
+ seconds = int(length)%60
return unescape('Track: %s - %s / %s %s:%.2d %2d%%' % (artist_name, name, album_name, minutes, seconds, popularity))
else:
return 'Unsupported type.'
+def _yt_duration(s):
+ mo = re.match(r'P(\d+D)?T(\d+H)?(\d+M)?(\d+S)?', s)
+ pcs = [x for x in mo.groups() if x]
+ return ''.join(pcs).lower()
+def _yt_date(s, f):
+ mo = re.match(r'(\d{4})-(\d{2})-(\d{2})T(\d{2}):(\d{2}):(\d{2})\.(\d+)Z', s)
+ return datetime.datetime(*(int(x) for x in mo.groups())).strftime(f)
+def _yt_round(n):
+ n = float(n)
+ if n >= 10**12:
+ return '%.1ft' % (n/10**12)
+ elif n >= 10**9:
+ return '%.1fb' % (n/10**9)
+ elif n >= 10**6:
+ return '%.1fm' % (n/10**6)
+ elif n >= 10**3:
+ return '%.1fk' % (n/10**3)
+ else:
+ return int(n)
+
def gotyoutube(url):
url_data = urlparse.urlparse(url)
query = urlparse.parse_qs(url_data.query)
video = query["v"][0]
- api_url = 'http://gdata.youtube.com/feeds/api/videos/%s?alt=json&v=2' % video
+ api_url = 'https://www.googleapis.com/youtube/v3/videos?part=snippet,contentDetails,statistics&id=%s&key=%s' % (video, lib.parent.cfg.get('urls', 'api_key'))
try:
respdata = urllib2.urlopen(api_url).read()
- video_info = json.loads(respdata)
+ v = json.loads(respdata)
+ v = v['items'][0]
- title = video_info['entry']['title']["$t"]
- author = video_info['entry']['author'][0]['name']['$t']
+ return unescape(lib.parent.cfg.get('urls', 'yt_format', "\002%(author)s\002: \037%(title)s\037 [%(duration)s, uploaded %(uploaded)s, %(views)s v/%(likes)s l/%(dislikes)s d]") % {
+ 'title': v['snippet']['title'],
+ 'author': v['snippet']['channelTitle'],
+ 'duration': _yt_duration(v['contentDetails']['duration']),
+ 'uploaded': _yt_date(v['snippet']['publishedAt'], lib.parent.cfg.get('urls', 'yt_date_format', '%b %d %Y')),
+ 'views': _yt_round(v['statistics']['viewCount']),
+ 'likes': _yt_round(v['statistics']['likeCount']),
+ 'dislikes': _yt_round(v['statistics']['dislikeCount']),
+ })
+ except urllib2.HTTPError as e:
+ if e.getcode() == 403:
+ return 'API limit exceeded'
+ else:
+ return str(e)
+ except IndexError:
+ return 'no results'
+ except Exception as e:
+ return str(e)
- return unescape("Youtube: %s (%s)" % (title, author))
+def gottwitch(uri):
+ url = 'https://api.twitch.tv/helix/streams?user_login=%s' % uri.split('/')[0]
+ opener = urllib2.build_opener()
+ opener.addheaders = [('Client-ID', lib.parent.cfg.get('urls', 'twitch_api_key'))]
+ respdata = opener.open(url).read()
+ twitch = json.loads(respdata)['data']
+ try:
+ # TODO: add current game.
+ return unescape('\037%s\037 is %s (%s)' % (twitch[0]['user_name'], twitch[0]['type'], twitch[0]['title']))
except:
- pass
+ return 'Channel offline.'
-def gottwitch(uri):
- url = 'http://api.justin.tv/api/stream/list.json?channel=%s' % uri.split('/')[0]
- respdata = urllib2.urlopen(url).read()
- twitch = json.loads(respdata)
+def _humanize_bytes(b):
+ b = int(b)
+ i = 0
+ table = " kMGTPEZYRQ"
+ while b > 1024:
+ i += 1
+ b /= 1024.0
+ if i == 0:
+ return "%dB" % (b)
+ else:
+ return "%.2f%siB" % (b, table[i])
+
+def _do_request(url, try_aia=False):
+ """
+ Return value is a tuple consisting of:
+ - the HTTPResponse object, or a string on error. Empty string -> no response.
+ - and a flag indicating whether AIA was used
+ """
+ try:
+ request = urllib2.Request(url, headers={
+ 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
+ 'accept-language': 'en-US,en;q=0.9',
+ 'cache-control': 'max-age=0',
+ 'sec-ch-ua': '"Chromium";v="122", "Not(A:Brand";v="24", "Google Chrome";v="122"',
+ 'sec-ch-ua-mobile': '?0',
+ 'sec-ch-ua-platform': '"Linux"',
+ 'sec-fetch-dest': 'document',
+ 'sec-fetch-mode': 'navigate',
+ 'sec-fetch-site': 'none',
+ 'sec-fetch-user': '?1',
+ 'upgrade-insecure-requests': '1',
+ 'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36'
+ })
+ except ValueError:
+ return '', False
+ if try_aia:
try:
- return unescape('Twitch: %s (%s playing %s)' % (twitch[0]['channel']['status'], twitch[0]['channel']['login'], twitch[0]['channel']['meta_game']))
- except:
- return 'Twitch: Channel offline.'
+ opener = urllib2.build_opener(urllib2.HTTPSHandler(context=aia_session.ssl_context_from_url(url)), SmartRedirectHandler())
+ except aia.AIAError as e:
+ return 'Request error: %s.%s: %s' % (e.__module__, e.__class__.__name__, e.args[0]), True
+ else:
+ opener = urllib2.build_opener(SmartRedirectHandler())
-def goturl(url):
- request = urllib2.Request(url)
- opener = urllib2.build_opener(SmartRedirectHandler())
+ # Send request and handle errors
try:
- soup = BeautifulSoup(opener.open(request, timeout=2))
- return unescape('Title: %s' % (soup.title.string))
- except:
- return None
+ response = opener.open(request, timeout=2)
+ except http.client.InvalidURL as e: # why does a method under urllib.request raise an exception under http.client???
+ return '', False
+ except urllib2.HTTPError as e:
+ return 'Request error: %s %s' % (e.code, e.reason), False
+ except urllib2.URLError as e:
+ if "certificate verify failed: unable to get local issuer certificate" in str(e.reason):
+ if aia: # Retry with AIA enabled, if module is present
+ return _do_request(url, True)
+ else:
+ lib.parent.log('urls', '?', 'If the site is not serving the certificate chain, installing the aia library might make this request work: pip install aia')
+ return 'Request error: site may have broken TLS configuration (%s)' % (e.reason), False
+ else:
+ return 'Request error: %s' % (e.reason), False
+ except TimeoutError as e:
+ return 'Request error: request timed out', False
+ except Exception as e:
+ return 'Unknown error: %s %r' % (type(e).__name__, e.args), False
+
+ return response, try_aia
+
+
+def goturl(url):
+ output = []
+ for _, group in other_regexes:
+ for regex in group:
+ if regex.match(url):
+ return None
+
+ response, used_aia = _do_request(url)
+ if isinstance(response, stringbase):
+ return response
+
+ # Try to add type and length headers to reply
+ c_type_fields = response.getheader('Content-Type', '').split(';')
+ c_type = c_type_fields.pop(0)
+ c_charset = None
+ for f in c_type_fields:
+ f = f.strip()
+ if len(f) > 8 and f[0:8] == 'charset=':
+ c_charset = f[8:]
+ c_len = response.getheader('Content-Length')
+ if c_type != '':
+ output.append("[%s] " % (c_type))
+ else:
+ output.append("[no type] ")
+ if c_type != "text/html": # else length will be provided by HTML code below
+ if c_len is not None:
+ output.append("[%s] " % (_humanize_bytes(c_len)))
+ else:
+ output.append("[no length] ")
+
+ if used_aia:
+ output.append("[AIA] ")
+
+ # Try to add title if HTML
+ if c_type == 'text/html':
+ try:
+ responsebody = response.read(1024*1024)
+ except Exception as e:
+ output.append('Error reading response body: %s %r' % (type(e).__name__, e.args))
+ else:
+ if c_len is not None and len(responsebody) != int(c_len): # did we read a different amount than Content-Length?
+ if response.read(1): # there's more data, we just aren't reading it
+ output.append("[read %s; Content-Length %s] " % (_humanize_bytes(len(responsebody)), _humanize_bytes(c_len)))
+ else:
+ output.append("[actual %s; Content-Length %s] " % (_humanize_bytes(len(responsebody)), _humanize_bytes(c_len)))
+ else: # Content-Length = amount read
+ output.append("[%s] " % (_humanize_bytes(len(responsebody))))
+ try:
+ soup = BeautifulSoup(responsebody, from_encoding=c_charset)
+ if soup.title:
+ output.append('Title: ' + unescape('%s' % (soup.find('title').string.strip())))
+ else:
+ output.append('No title')
+ except Exception as e:
+ output.append('Title error: %s %r ' % (type(e).__name__, e.args))
+
+ return ''.join(output)
+
+url_regex = (
+ re.compile(r'https?://(?:[^/\s.]+\.)+[a-z0-9-]+(?::\d{1,5})?(?:/[^\s\]>)}]+)?', re.I),
+)
+other_regexes = (
+ (lambda x: '', (re.compile(r"""https?://(?:www\.)?(?:twitter|x)\.com/""", re.I),)), # skip twitter
+ (lambda x: '', (re.compile(r"""https?://(?:www\.)?reddit\.com/""", re.I),)), # skip new-reddit
+ (lambda x: '', (re.compile(r"""https?://jfr\.im/git/""", re.I),)), # skip my gitweb
+ (lambda x: '', (re.compile(r"""https?://(?:www\.)?wunderground\.com/""", re.I),)), # skip wunderground, they time us out
+)
+regexes = other_regexes + (
+ (goturl, url_regex),
+)