]> jfr.im git - erebus.git/blobdiff - modules/urls.py
urls - fix choke on malformed URL
[erebus.git] / modules / urls.py
index 93615911226ffc94c7cd09712603497941d43806..4b27ff28b5a84c60ffec78d0dac5965b3933f34d 100644 (file)
@@ -23,19 +23,35 @@ modstop = lib.modstop
 # module code
 import sys
 if sys.version_info.major < 3:
+       stringbase = basestring
        import urllib2
        import urlparse
        import HTMLParser
+       html = HTMLParser.HTMLParser()
        from BeautifulSoup import BeautifulSoup
 else:
+       stringbase = str
        import urllib.request as urllib2
        import urllib.parse as urlparse
-       import html.parser as HTMLParser
+       import html
        from bs4 import BeautifulSoup
+import http.client
 
 import re, json, datetime
 
-html_parser = HTMLParser.HTMLParser()
+try:
+       import aia
+       aia_session = aia.AIASession()
+       # aia is broken on capath systems, needs cafile to work
+       aia_session._context.load_verify_locations(cafile='/etc/ssl/certs/ca-certificates.crt')
+       aia_session._trusted = {
+               aia.openssl_get_cert_info(ca_der)["subject"]: ca_der
+               for ca_der in aia_session._context.get_ca_certs(True)
+       }
+       print("aia loaded")
+except ImportError as e:
+       print(repr(e))
+       aia = None
 
 hostmask_regex = re.compile(r'^(.*)!(.*)@(.*)$')
 
@@ -76,20 +92,26 @@ class SmartRedirectHandler(urllib2.HTTPRedirectHandler):
                result.status = code
                return result
 
+def _get_blocked_chans():
+       return lib.parent.cfg.get('urls', 'blocked', '').split(',')
+
 def process_line(line):
        responses = []
        num_found = 0
        limit = lib.parent.cfg.getint('urls', 'limit', 2)
-       for action, group, prefix in regexes:
+       for action, group in regexes:
                for regex in group:
                        for match in regex.findall(line):
                                if match:
                                        num_found += 1
                                        if num_found > limit:
                                                return responses
-                                       resp = action(match)
-                                       if resp is not None:
-                                               responses.append("%s: %s" % (prefix, action(match)))
+                                       if isinstance(match, stringbase):
+                                               resp = action(match)
+                                       else:
+                                               resp = action(*match)
+                                       if resp is not None and resp != "":
+                                               responses.append(resp)
        return responses
 
 @lib.hooknum("PRIVMSG")
@@ -97,12 +119,17 @@ def privmsg_hook(bot, textline):
        user = parser_hostmask(textline[1:textline.find(' ')])
        chan = textline.split()[2]
 
+       if chan in _get_blocked_chans(): return
+
        try:
                line = textline.split(None, 3)[3][1:]
        except IndexError:
                line = ''
 
        responses = process_line(line)
+       send_response(bot, chan, responses)
+
+def send_response(bot, chan, responses):
        if len(responses) > 0:
                if lib.parent.cfg.getboolean('urls', 'multiline'):
                        for r in responses:
@@ -111,7 +138,7 @@ def privmsg_hook(bot, textline):
                        bot.msg(chan, ' | '.join(responses), True)
 
 def unescape(line):
-       return re.sub('\s+', ' ', html_parser.unescape(line))
+       return re.sub('\s+', ' ', html.unescape(line))
 
 def gotspotify(type, track):
        url = 'http://ws.spotify.com/lookup/1/?uri=spotify:%s:%s' % (type, track)
@@ -202,42 +229,120 @@ def gottwitch(uri):
        except:
                return 'Channel offline.'
 
+def _humanize_bytes(b):
+       b = int(b)
+       i = 0
+       table = " kMGTPEZYRQ"
+       while b > 1024:
+               i += 1
+               b /= 1024.0
+       if i == 0:
+               return "%dB" % (b)
+       else:
+               return "%.2f%siB" % (b, table[i])
+
+def _do_request(url, try_aia=False):
+       """
+               Return value is a tuple consisting of:
+               - the HTTPResponse object, or a string on error. Empty string -> no response.
+               - and a flag indicating whether AIA was used
+       """
+       try:
+               request = urllib2.Request(url, headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36', 'Sec-Ch-Ua': '"Chromium";v="116", "Not)A;Brand";v="24", "Google Chrome";v="116"', 'Sec-Ch-Ua-Mobile': '?0', 'Sec-Ch-Ua-Platform': '"Linux"', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'same-origin', 'Sec-Fetch-User': '?1', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'Accept-Language': 'en-US,en;q=0.9', 'Cache-Control': 'no-cache', 'Pragma': 'no-cache', 'Upgrade-Insecure-Requests': '1'})
+       except ValueError:
+               return '', False
+       if try_aia:
+               opener = urllib2.build_opener(urllib2.HTTPSHandler(context=aia_session.ssl_context_from_url(url)), SmartRedirectHandler())
+       else:
+               opener = urllib2.build_opener(SmartRedirectHandler())
+
+       # Send request and handle errors
+       try:
+               response = opener.open(request, timeout=2)
+       except http.client.InvalidURL as e: # why does a method under urllib.request raise an exception under http.client???
+               return '', False
+       except urllib2.HTTPError as e:
+               return 'Request error: %s %s' % (e.code, e.reason), False
+       except urllib2.URLError as e:
+               if "certificate verify failed: unable to get local issuer certificate" in str(e.reason):
+                       if aia: # Retry with AIA enabled, if module is present
+                               return _do_request(url, True)
+                       else:
+                               lib.parent.log('urls', '?', 'If the site is not serving the certificate chain, installing the aia library might make this request work: pip install aia')
+                               return 'Request error: site may have broken TLS configuration (%s)' % (e.reason), False
+               else:
+                       return 'Request error: %s' % (e.reason), False
+       except TimeoutError as e:
+               return 'Request error: request timed out', False
+       except Exception as e:
+               return 'Unknown error: %s %r' % (type(e).__name__, e.args), False
+
+       return response, try_aia
+
+
 def goturl(url):
-       for _, group, _ in other_regexes:
+       output = []
+       for _, group in other_regexes:
                for regex in group:
                        if regex.match(url):
                                return None
-       request = urllib2.Request(url)
-       opener = urllib2.build_opener(SmartRedirectHandler())
-       try:
-               soup = BeautifulSoup(opener.open(request, timeout=0.5))
-               if soup.title:
-                       return unescape('%s' % (soup.title.string))
+
+       response, used_aia = _do_request(url)
+       if isinstance(response, stringbase):
+               return response
+
+       # Try to add type and length headers to reply
+       c_type_fields = response.getheader('Content-Type', '').split(';')
+       c_type = c_type_fields.pop(0)
+       c_charset = None
+       for f in c_type_fields:
+               f = f.strip()
+               if len(f) > 8 and f[0:8] == 'charset=':
+                       c_charset = f[8:]
+       c_len = response.getheader('Content-Length')
+       if c_type != '':
+               output.append("[%s] " % (c_type))
+       else:
+               output.append("[no type] ")
+       if c_type != "text/html": # else length will be provided by HTML code below
+               if c_len is not None:
+                       output.append("[%s] " % (_humanize_bytes(c_len)))
                else:
-                       return None
-       except urllib2.HTTPError as e:
-               return 'Error: %s %s' % (e.code, e.reason)
-       except Exception as e:
-               return 'Error: %r' % (e.message)
+                       output.append("[no length] ")
+
+       if used_aia:
+               output.append("[AIA] ")
+
+       # Try to add title if HTML
+       if c_type == 'text/html':
+               try:
+                       responsebody = response.read(1024*1024)
+               except Exception as e:
+                       output.append('Error reading response body: %s %r' % (type(e).__name__, e.args))
+               else:
+                       if c_len is not None and len(responsebody) != int(c_len):
+                               output.append("[actual %s; Content-Length %s] " % (_humanize_bytes(len(responsebody)), _humanize_bytes(c_len)))
+                       else:
+                               output.append("[%s] " % (_humanize_bytes(len(responsebody))))
+                       try:
+                               soup = BeautifulSoup(responsebody, from_encoding=c_charset)
+                               if soup.title:
+                                       output.append('Title: ' + unescape('%s' % (soup.find('title').string.strip())))
+                               else:
+                                       output.append('No title')
+                       except Exception as e:
+                               output.append('Title error: %s %r ' % (type(e).__name__, e.args))
+
+       return ''.join(output)
 
 url_regex = (
-       re.compile(r'https?://[^/\s]+\.[^/\s]+(?:/\S+)?'),
-)
-spotify_regex = (
-       re.compile(r'spotify:(?P<type>\w+):(?P<track_id>\w{22})'),
-       re.compile(r'https?://open\.spotify\.com/(?P<type>\w+)/(?P<track_id>\w+)')
-)
-youtube_regex = (
-       re.compile(r'https?://(?:www\.)?youtube\.com/watch\?[a-zA-Z0-9=&_\-]+'),
-)
-twitch_regex = (
-       re.compile(r'https?:\/\/(?:www\.)?twitch.tv\/([A-Za-z0-9]*)'),
+       re.compile(r'https?://(?:[^/\s.]+\.)+[^/\s.]+(?:/\S+)?'),
 )
 other_regexes = (
-       (gotspotify, spotify_regex, 'Spotify'),
-       (gotyoutube, youtube_regex, 'YouTube'),
-       (gottwitch, twitch_regex, 'Twitch'),
+       (lambda x: '', (re.compile(r"""https?://(?:www\.)?(?:twitter|x)\.com/""", re.I),)), # skip twitter
+       (lambda x: '', (re.compile(r"""https?://(?:www\.)?reddit\.com/""", re.I),)), # skip new-reddit
+       (lambda x: '', (re.compile(r"""https?://jfr\.im/git/""", re.I),)), # skip my gitweb
 )
 regexes = other_regexes + (
-       (goturl, url_regex, 'Title'),
+       (goturl, url_regex),
 )