X-Git-Url: https://jfr.im/git/erebus.git/blobdiff_plain/74dc2a9d7bcedcae16a435361e4527896d92587e..3c63ef0547be2798c4ed4f55ad9ee7d5e14eb013:/modules/urls.py diff --git a/modules/urls.py b/modules/urls.py index cfcd45f..4935d68 100644 --- a/modules/urls.py +++ b/modules/urls.py @@ -38,6 +38,20 @@ else: import re, json, datetime +try: + import aia + aia_session = aia.AIASession() + # aia is broken on capath systems, needs cafile to work + aia_session._context.load_verify_locations(cafile='/etc/ssl/certs/ca-certificates.crt') + aia_session._trusted = { + aia.openssl_get_cert_info(ca_der)["subject"]: ca_der + for ca_der in aia_session._context.get_ca_certs(True) + } + print("aia loaded") +except ImportError as e: + print(repr(e)) + aia = None + hostmask_regex = re.compile(r'^(.*)!(.*)@(.*)$') def parser_hostmask(hostmask): @@ -77,11 +91,14 @@ class SmartRedirectHandler(urllib2.HTTPRedirectHandler): result.status = code return result +def _get_blocked_chans(): + return lib.parent.cfg.get('urls', 'blocked', '').split(',') + def process_line(line): responses = [] num_found = 0 limit = lib.parent.cfg.getint('urls', 'limit', 2) - for action, group, prefix in regexes: + for action, group in regexes: for regex in group: for match in regex.findall(line): if match: @@ -92,8 +109,8 @@ def process_line(line): resp = action(match) else: resp = action(*match) - if resp is not None: - responses.append("%s: %s" % (prefix, action(match))) + if resp is not None and resp != "": + responses.append(resp) return responses @lib.hooknum("PRIVMSG") @@ -101,6 +118,8 @@ def privmsg_hook(bot, textline): user = parser_hostmask(textline[1:textline.find(' ')]) chan = textline.split()[2] + if chan in _get_blocked_chans(): return + try: line = textline.split(None, 3)[3][1:] except IndexError: @@ -206,44 +225,100 @@ def gottwitch(uri): except: return 'Channel offline.' +def _humanize_bytes(b): + b = int(b) + i = 0 + table = " kMGTPEZYRQ" + while b > 1024: + i += 1 + b /= 1024.0 + if i == 0: + return "%dB" % (b) + else: + return "%.2f%siB" % (b, table[i]) + +def _do_request(url, try_aia=False): + """Returns the HTTPResponse object, or a string on error""" + request = urllib2.Request(url, headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36'}) + if try_aia: + opener = urllib2.build_opener(urllib2.HTTPSHandler(context=aia_session.ssl_context_from_url(url)), SmartRedirectHandler()) + else: + opener = urllib2.build_opener(SmartRedirectHandler()) + + # Send request and handle errors + try: + response = opener.open(request, timeout=2) + except urllib2.HTTPError as e: + return 'Request error: %s %s' % (e.code, e.reason) + except urllib2.URLError as e: + if "certificate verify failed: unable to get local issuer certificate" in str(e.reason): + if aia: # Retry with AIA enabled + return _do_request(url, True) + else: + lib.parent.log('urls', '?', 'If the site is not serving the certificate chain, installing the aia library might make this request work: pip install aia') + return 'Request error: site may have broken TLS configuration (%s)' % (e.reason) + else: + return 'Request error: %s' % (e.reason) + except TimeoutError as e: + return 'Request error: request timed out' + except Exception as e: + return 'Unknown error: %s %r' % (type(e).__name__, e.args) + + return response + + def goturl(url): - for _, group, _ in other_regexes: + output = [] + for _, group in other_regexes: for regex in group: if regex.match(url): return None - request = urllib2.Request(url) - opener = urllib2.build_opener(SmartRedirectHandler()) - try: - soup = BeautifulSoup(opener.open(request, timeout=0.5)) - if soup.title: - return unescape('%s' % (soup.title.string)) + + response = _do_request(url) + if isinstance(response, stringbase): + return response + + # Try to add type and length headers to reply + c_type = response.getheader('Content-Type', '').split(';', 1)[0] + c_len = response.getheader('Content-Length') + if c_type != '': + output.append("[%s] " % (c_type)) + else: + output.append("[no type] ") + if c_type != "text/html": # else length will be provided by HTML code below + if c_len is not None: + output.append("[%s] " % (_humanize_bytes(c_len))) else: - return None - except urllib2.HTTPError as e: - return 'Error: %s %s' % (e.code, e.reason) - except urllib2.URLError as e: - return 'Error: %s' % (e.reason) - except Exception as e: - return 'Error: %r' % (e.args) + output.append("[no length] ") + + # Try to add title if HTML + if c_type == 'text/html': + try: + responsebody = response.read(1024*1024) + except Exception as e: + output.append('Error reading response body: %s %r' % (type(e).__name__, e.args)) + else: + if c_len is not None and len(responsebody) != int(c_len): + output.append("[actual %s; Content-Length %s] " % (_humanize_bytes(len(responsebody)), _humanize_bytes(c_len))) + else: + output.append("[%s] " % (_humanize_bytes(len(responsebody)))) + try: + soup = BeautifulSoup(responsebody) + if soup.title: + output.append('Title: ' + unescape('%s' % (soup.find('title').string.strip()))) + else: + output.append('No title') + except Exception as e: + output.append('Title error: %s %r ' % (type(e).__name__, e.args)) + + return ''.join(output) url_regex = ( - re.compile(r'https?://[^/\s]+\.[^/\s]+(?:/\S+)?'), -) -spotify_regex = ( - re.compile(r'spotify:(?P\w+):(?P\w{22})'), - re.compile(r'https?://open\.spotify\.com/(?P\w+)/(?P\w+)') -) -youtube_regex = ( - re.compile(r'https?://(?:www\.)?youtube\.com/watch\?[a-zA-Z0-9=&_\-]+'), -) -twitch_regex = ( - re.compile(r'https?:\/\/(?:www\.)?twitch.tv\/([A-Za-z0-9]*)'), + re.compile(r'https?://(?:[^/\s.]+\.)+[^/\s.]+(?:/\S+)?'), ) other_regexes = ( - (gotspotify, spotify_regex, 'Spotify'), - (gotyoutube, youtube_regex, 'YouTube'), - (gottwitch, twitch_regex, 'Twitch'), + (lambda x: '', (re.compile(r"""https?://(?:www\.)?(?:twitter|x)\.com/""", re.I),)), # skip twitter ) regexes = other_regexes + ( - (goturl, url_regex, 'Title'), + (goturl, url_regex), )