X-Git-Url: https://jfr.im/git/erebus.git/blobdiff_plain/ecbed32887af35897bfd78e2289d6253a21c2559..3cec5bdc665d310ac628e80b007dd2d71ae7d7bd:/modules/urls.py diff --git a/modules/urls.py b/modules/urls.py index 80ae1ee..65f9ca1 100644 --- a/modules/urls.py +++ b/modules/urls.py @@ -35,9 +35,24 @@ else: import urllib.parse as urlparse import html from bs4 import BeautifulSoup +import http.client import re, json, datetime +try: + import aia + aia_session = aia.AIASession() + # aia is broken on capath systems, needs cafile to work + aia_session._context.load_verify_locations(cafile='/etc/ssl/certs/ca-certificates.crt') + aia_session._trusted = { + aia.openssl_get_cert_info(ca_der)["subject"]: ca_der + for ca_der in aia_session._context.get_ca_certs(True) + } + print("aia loaded") +except ImportError as e: + print(repr(e)) + aia = None + hostmask_regex = re.compile(r'^(.*)!(.*)@(.*)$') def parser_hostmask(hostmask): @@ -77,6 +92,9 @@ class SmartRedirectHandler(urllib2.HTTPRedirectHandler): result.status = code return result +def _get_blocked_chans(): + return lib.parent.cfg.get('urls', 'blocked', '').split(',') + def process_line(line): responses = [] num_found = 0 @@ -101,12 +119,17 @@ def privmsg_hook(bot, textline): user = parser_hostmask(textline[1:textline.find(' ')]) chan = textline.split()[2] + if chan in _get_blocked_chans(): return + try: line = textline.split(None, 3)[3][1:] except IndexError: line = '' responses = process_line(line) + send_response(bot, chan, responses) + +def send_response(bot, chan, responses): if len(responses) > 0: if lib.parent.cfg.getboolean('urls', 'multiline'): for r in responses: @@ -218,29 +241,80 @@ def _humanize_bytes(b): else: return "%.2f%siB" % (b, table[i]) -def goturl(url): - output = [] - for _, group in other_regexes: - for regex in group: - if regex.match(url): - return None - request = urllib2.Request(url, headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36'}) - opener = urllib2.build_opener(SmartRedirectHandler()) +def _do_request(url, try_aia=False): + """ + Return value is a tuple consisting of: + - the HTTPResponse object, or a string on error. Empty string -> no response. + - and a flag indicating whether AIA was used + """ + try: + request = urllib2.Request(url, headers={ + 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', + 'accept-language': 'en-US,en;q=0.9', + 'cache-control': 'max-age=0', + 'sec-ch-ua': '"Chromium";v="122", "Not(A:Brand";v="24", "Google Chrome";v="122"', + 'sec-ch-ua-mobile': '?0', + 'sec-ch-ua-platform': '"Linux"', + 'sec-fetch-dest': 'document', + 'sec-fetch-mode': 'navigate', + 'sec-fetch-site': 'none', + 'sec-fetch-user': '?1', + 'upgrade-insecure-requests': '1', + 'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36' + }) + except ValueError: + return '', False + if try_aia: + try: + opener = urllib2.build_opener(urllib2.HTTPSHandler(context=aia_session.ssl_context_from_url(url)), SmartRedirectHandler()) + except aia.AIAError as e: + return 'Request error: %s.%s: %s' % (e.__module__, e.__class__.__name__, e.args[0]), True + else: + opener = urllib2.build_opener(SmartRedirectHandler()) # Send request and handle errors try: response = opener.open(request, timeout=2) + except http.client.InvalidURL as e: # why does a method under urllib.request raise an exception under http.client??? + return '', False except urllib2.HTTPError as e: - return 'Request error: %s %s' % (e.code, e.reason) + return 'Request error: %s %s' % (e.code, e.reason), False except urllib2.URLError as e: - return 'Request error: %s' % (e.reason) + if "certificate verify failed: unable to get local issuer certificate" in str(e.reason): + if aia: # Retry with AIA enabled, if module is present + return _do_request(url, True) + else: + lib.parent.log('urls', '?', 'If the site is not serving the certificate chain, installing the aia library might make this request work: pip install aia') + return 'Request error: site may have broken TLS configuration (%s)' % (e.reason), False + else: + return 'Request error: %s' % (e.reason), False except TimeoutError as e: - return 'Request error: request timed out' + return 'Request error: request timed out', False except Exception as e: - return 'Unknown error: %s %r' % (type(e).__name__, e.args) + return 'Unknown error: %s %r' % (type(e).__name__, e.args), False + + return response, try_aia + + +def goturl(url): + output = [] + for _, group in other_regexes: + for regex in group: + if regex.match(url): + return None + + response, used_aia = _do_request(url) + if isinstance(response, stringbase): + return response # Try to add type and length headers to reply - c_type = response.getheader('Content-Type', '').split(';', 1)[0] + c_type_fields = response.getheader('Content-Type', '').split(';') + c_type = c_type_fields.pop(0) + c_charset = None + for f in c_type_fields: + f = f.strip() + if len(f) > 8 and f[0:8] == 'charset=': + c_charset = f[8:] c_len = response.getheader('Content-Length') if c_type != '': output.append("[%s] " % (c_type)) @@ -252,22 +326,27 @@ def goturl(url): else: output.append("[no length] ") + if used_aia: + output.append("[AIA] ") + # Try to add title if HTML if c_type == 'text/html': try: responsebody = response.read(1024*1024) - print(type(responsebody)) except Exception as e: output.append('Error reading response body: %s %r' % (type(e).__name__, e.args)) else: - if c_len is not None and len(responsebody) != int(c_len): - output.append("[actual %s; Content-Length %s] " % (_humanize_bytes(len(responsebody)), _humanize_bytes(c_len))) - else: + if c_len is not None and len(responsebody) != int(c_len): # did we read a different amount than Content-Length? + if response.read(1): # there's more data, we just aren't reading it + output.append("[read %s; Content-Length %s] " % (_humanize_bytes(len(responsebody)), _humanize_bytes(c_len))) + else: + output.append("[actual %s; Content-Length %s] " % (_humanize_bytes(len(responsebody)), _humanize_bytes(c_len))) + else: # Content-Length = amount read output.append("[%s] " % (_humanize_bytes(len(responsebody)))) try: - soup = BeautifulSoup(responsebody) + soup = BeautifulSoup(responsebody, from_encoding=c_charset) if soup.title: - output.append('Title: ' + unescape('%s' % (soup.find('title').string))) + output.append('Title: ' + unescape('%s' % (soup.find('title').string.strip()))) else: output.append('No title') except Exception as e: @@ -276,9 +355,13 @@ def goturl(url): return ''.join(output) url_regex = ( - re.compile(r'https?://(?:[^/\s.]+\.)+[^/\s.]+(?:/\S+)?'), + re.compile(r'https?://(?:[^/\s.]+\.)+[a-z0-9-]+(?::\d{1,5})?(?:/[^\s\]>)}]+)?', re.I), ) other_regexes = ( + (lambda x: '', (re.compile(r"""https?://(?:www\.)?(?:twitter|x)\.com/""", re.I),)), # skip twitter + (lambda x: '', (re.compile(r"""https?://(?:www\.)?reddit\.com/""", re.I),)), # skip new-reddit + (lambda x: '', (re.compile(r"""https?://jfr\.im/git/""", re.I),)), # skip my gitweb + (lambda x: '', (re.compile(r"""https?://(?:www\.)?wunderground\.com/""", re.I),)), # skip wunderground, they time us out ) regexes = other_regexes + ( (goturl, url_regex),