]> jfr.im git - erebus.git/blame_incremental - modules/urls.py
subtext - security - only allow digits to be backslashed
[erebus.git] / modules / urls.py
... / ...
CommitLineData
1# Erebus IRC bot - Author: Conny Sjoblom
2# vim: fileencoding=utf-8
3# URL Checker
4# This file is released into the public domain; see http://unlicense.org/
5
6# module info
7modinfo = {
8 'author': 'Erebus Team',
9 'license': 'public domain',
10 'compatible': [0],
11 'depends': [],
12 'softdeps': [],
13}
14
15# http://embed.ly/tools/generator
16
17# preamble
18import modlib
19lib = modlib.modlib(__name__)
20modstart = lib.modstart
21modstop = lib.modstop
22
23# module code
24import sys
25if sys.version_info.major < 3:
26 stringbase = basestring
27 import urllib2
28 import urlparse
29 import HTMLParser
30 html = HTMLParser.HTMLParser()
31 from BeautifulSoup import BeautifulSoup
32else:
33 stringbase = str
34 import urllib.request as urllib2
35 import urllib.parse as urlparse
36 import html
37 from bs4 import BeautifulSoup
38import http.client
39
40import re, json, datetime
41
42try:
43 import aia
44 aia_session = aia.AIASession()
45 # aia is broken on capath systems, needs cafile to work
46 aia_session._context.load_verify_locations(cafile='/etc/ssl/certs/ca-certificates.crt')
47 aia_session._trusted = {
48 aia.openssl_get_cert_info(ca_der)["subject"]: ca_der
49 for ca_der in aia_session._context.get_ca_certs(True)
50 }
51 print("aia loaded")
52except ImportError as e:
53 print(repr(e))
54 aia = None
55
56hostmask_regex = re.compile(r'^(.*)!(.*)@(.*)$')
57
58def parser_hostmask(hostmask):
59 if isinstance(hostmask, dict):
60 return hostmask
61
62 nick = None
63 user = None
64 host = None
65
66 if hostmask is not None:
67 match = hostmask_regex.match(hostmask)
68
69 if not match:
70 nick = hostmask
71 else:
72 nick = match.group(1)
73 user = match.group(2)
74 host = match.group(3)
75
76 return {
77 'nick': nick,
78 'user': user,
79 'host': host
80 }
81
82class SmartRedirectHandler(urllib2.HTTPRedirectHandler):
83 def http_error_301(self, req, fp, code, msg, headers):
84 result = urllib2.HTTPRedirectHandler.http_error_301(
85 self, req, fp, code, msg, headers)
86 result.status = code
87 return result
88
89 def http_error_302(self, req, fp, code, msg, headers):
90 result = urllib2.HTTPRedirectHandler.http_error_302(
91 self, req, fp, code, msg, headers)
92 result.status = code
93 return result
94
95def _get_blocked_chans():
96 return lib.parent.cfg.get('urls', 'blocked', '').split(',')
97
98def process_line(line):
99 responses = []
100 num_found = 0
101 limit = lib.parent.cfg.getint('urls', 'limit', 2)
102 for action, group in regexes:
103 for regex in group:
104 for match in regex.findall(line):
105 if match:
106 num_found += 1
107 if num_found > limit:
108 return responses
109 if isinstance(match, stringbase):
110 resp = action(match)
111 else:
112 resp = action(*match)
113 if resp is not None and resp != "":
114 responses.append(resp)
115 return responses
116
117@lib.hooknum("PRIVMSG")
118def privmsg_hook(bot, textline):
119 user = parser_hostmask(textline[1:textline.find(' ')])
120 chan = textline.split()[2]
121
122 if chan in _get_blocked_chans(): return
123
124 try:
125 line = textline.split(None, 3)[3][1:]
126 except IndexError:
127 line = ''
128
129 responses = process_line(line)
130 send_response(bot, chan, responses)
131
132def send_response(bot, chan, responses):
133 if len(responses) > 0:
134 if lib.parent.cfg.getboolean('urls', 'multiline'):
135 for r in responses:
136 bot.msg(chan, r, True)
137 else:
138 bot.msg(chan, ' | '.join(responses), True)
139
140def unescape(line):
141 return re.sub('\s+', ' ', html.unescape(line))
142
143def gotspotify(type, track):
144 url = 'http://ws.spotify.com/lookup/1/?uri=spotify:%s:%s' % (type, track)
145 xml = urllib2.urlopen(url).read()
146 soup = BeautifulSoup(xml, convertEntities=BeautifulSoup.HTML_ENTITIES)
147 lookup_type = soup.contents[2].name
148
149 if lookup_type == 'track':
150 name = soup.find('name').string
151 album_name = soup.find('album').find('name').string
152 artist_name = soup.find('artist').find('name').string
153 popularity = soup.find('popularity')
154 if popularity:
155 popularity = float(popularity.string)*100
156 length = float(soup.find('length').string)
157 minutes = int(length)/60
158 seconds = int(length)%60
159
160 return unescape('Track: %s - %s / %s %s:%.2d %2d%%' % (artist_name, name, album_name, minutes, seconds, popularity))
161
162 elif lookup_type == 'album':
163 album_name = soup.find('album').find('name').string
164 artist_name = soup.find('artist').find('name').string
165 released = soup.find('released').string
166 return unescape('Album: %s - %s - %s' % (artist_name, album_name, released))
167
168 else:
169 return 'Unsupported type.'
170
171def _yt_duration(s):
172 mo = re.match(r'P(\d+D)?T(\d+H)?(\d+M)?(\d+S)?', s)
173 pcs = [x for x in mo.groups() if x]
174 return ''.join(pcs).lower()
175def _yt_date(s, f):
176 mo = re.match(r'(\d{4})-(\d{2})-(\d{2})T(\d{2}):(\d{2}):(\d{2})\.(\d+)Z', s)
177 return datetime.datetime(*(int(x) for x in mo.groups())).strftime(f)
178def _yt_round(n):
179 n = float(n)
180 if n >= 10**12:
181 return '%.1ft' % (n/10**12)
182 elif n >= 10**9:
183 return '%.1fb' % (n/10**9)
184 elif n >= 10**6:
185 return '%.1fm' % (n/10**6)
186 elif n >= 10**3:
187 return '%.1fk' % (n/10**3)
188 else:
189 return int(n)
190
191def gotyoutube(url):
192 url_data = urlparse.urlparse(url)
193 query = urlparse.parse_qs(url_data.query)
194 video = query["v"][0]
195 api_url = 'https://www.googleapis.com/youtube/v3/videos?part=snippet,contentDetails,statistics&id=%s&key=%s' % (video, lib.parent.cfg.get('urls', 'api_key'))
196 try:
197 respdata = urllib2.urlopen(api_url).read()
198 v = json.loads(respdata)
199 v = v['items'][0]
200
201 return unescape(lib.parent.cfg.get('urls', 'yt_format', "\002%(author)s\002: \037%(title)s\037 [%(duration)s, uploaded %(uploaded)s, %(views)s v/%(likes)s l/%(dislikes)s d]") % {
202 'title': v['snippet']['title'],
203 'author': v['snippet']['channelTitle'],
204 'duration': _yt_duration(v['contentDetails']['duration']),
205 'uploaded': _yt_date(v['snippet']['publishedAt'], lib.parent.cfg.get('urls', 'yt_date_format', '%b %d %Y')),
206 'views': _yt_round(v['statistics']['viewCount']),
207 'likes': _yt_round(v['statistics']['likeCount']),
208 'dislikes': _yt_round(v['statistics']['dislikeCount']),
209 })
210 except urllib2.HTTPError as e:
211 if e.getcode() == 403:
212 return 'API limit exceeded'
213 else:
214 return str(e)
215 except IndexError:
216 return 'no results'
217 except Exception as e:
218 return str(e)
219
220def gottwitch(uri):
221 url = 'https://api.twitch.tv/helix/streams?user_login=%s' % uri.split('/')[0]
222 opener = urllib2.build_opener()
223 opener.addheaders = [('Client-ID', lib.parent.cfg.get('urls', 'twitch_api_key'))]
224 respdata = opener.open(url).read()
225 twitch = json.loads(respdata)['data']
226 try:
227 # TODO: add current game.
228 return unescape('\037%s\037 is %s (%s)' % (twitch[0]['user_name'], twitch[0]['type'], twitch[0]['title']))
229 except:
230 return 'Channel offline.'
231
232def _humanize_bytes(b):
233 b = int(b)
234 i = 0
235 table = " kMGTPEZYRQ"
236 while b > 1024:
237 i += 1
238 b /= 1024.0
239 if i == 0:
240 return "%dB" % (b)
241 else:
242 return "%.2f%siB" % (b, table[i])
243
244def _do_request(url, try_aia=False):
245 """
246 Return value is a tuple consisting of:
247 - the HTTPResponse object, or a string on error. Empty string -> no response.
248 - and a flag indicating whether AIA was used
249 """
250 try:
251 request = urllib2.Request(url, headers={
252 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
253 'accept-language': 'en-US,en;q=0.9',
254 'cache-control': 'max-age=0',
255 'sec-ch-ua': '"Chromium";v="122", "Not(A:Brand";v="24", "Google Chrome";v="122"',
256 'sec-ch-ua-mobile': '?0',
257 'sec-ch-ua-platform': '"Linux"',
258 'sec-fetch-dest': 'document',
259 'sec-fetch-mode': 'navigate',
260 'sec-fetch-site': 'none',
261 'sec-fetch-user': '?1',
262 'upgrade-insecure-requests': '1',
263 'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36'
264 })
265 except ValueError:
266 return '', False
267 if try_aia:
268 try:
269 opener = urllib2.build_opener(urllib2.HTTPSHandler(context=aia_session.ssl_context_from_url(url)), SmartRedirectHandler())
270 except aia.AIAError as e:
271 return 'Request error: %s.%s: %s' % (e.__module__, e.__class__.__name__, e.args[0]), True
272 else:
273 opener = urllib2.build_opener(SmartRedirectHandler())
274
275 # Send request and handle errors
276 try:
277 response = opener.open(request, timeout=2)
278 except http.client.InvalidURL as e: # why does a method under urllib.request raise an exception under http.client???
279 return '', False
280 except urllib2.HTTPError as e:
281 return 'Request error: %s %s' % (e.code, e.reason), False
282 except urllib2.URLError as e:
283 if "certificate verify failed: unable to get local issuer certificate" in str(e.reason):
284 if aia: # Retry with AIA enabled, if module is present
285 return _do_request(url, True)
286 else:
287 lib.parent.log('urls', '?', 'If the site is not serving the certificate chain, installing the aia library might make this request work: pip install aia')
288 return 'Request error: site may have broken TLS configuration (%s)' % (e.reason), False
289 else:
290 return 'Request error: %s' % (e.reason), False
291 except TimeoutError as e:
292 return 'Request error: request timed out', False
293 except Exception as e:
294 return 'Unknown error: %s %r' % (type(e).__name__, e.args), False
295
296 return response, try_aia
297
298
299def goturl(url):
300 output = []
301 for _, group in other_regexes:
302 for regex in group:
303 if regex.match(url):
304 return None
305
306 response, used_aia = _do_request(url)
307 if isinstance(response, stringbase):
308 return response
309
310 # Try to add type and length headers to reply
311 c_type_fields = response.getheader('Content-Type', '').split(';')
312 c_type = c_type_fields.pop(0)
313 c_charset = None
314 for f in c_type_fields:
315 f = f.strip()
316 if len(f) > 8 and f[0:8] == 'charset=':
317 c_charset = f[8:]
318 c_len = response.getheader('Content-Length')
319 if c_type != '':
320 output.append("[%s] " % (c_type))
321 else:
322 output.append("[no type] ")
323 if c_type != "text/html": # else length will be provided by HTML code below
324 if c_len is not None:
325 output.append("[%s] " % (_humanize_bytes(c_len)))
326 else:
327 output.append("[no length] ")
328
329 if used_aia:
330 output.append("[AIA] ")
331
332 # Try to add title if HTML
333 if c_type == 'text/html':
334 try:
335 responsebody = response.read(1024*1024)
336 except Exception as e:
337 output.append('Error reading response body: %s %r' % (type(e).__name__, e.args))
338 else:
339 if c_len is not None and len(responsebody) != int(c_len): # did we read a different amount than Content-Length?
340 if response.read(1): # there's more data, we just aren't reading it
341 output.append("[read %s; Content-Length %s] " % (_humanize_bytes(len(responsebody)), _humanize_bytes(c_len)))
342 else:
343 output.append("[actual %s; Content-Length %s] " % (_humanize_bytes(len(responsebody)), _humanize_bytes(c_len)))
344 else: # Content-Length = amount read
345 output.append("[%s] " % (_humanize_bytes(len(responsebody))))
346 try:
347 soup = BeautifulSoup(responsebody, from_encoding=c_charset)
348 if soup.title:
349 output.append('Title: ' + unescape('%s' % (soup.find('title').string.strip())))
350 else:
351 output.append('No title')
352 except Exception as e:
353 output.append('Title error: %s %r ' % (type(e).__name__, e.args))
354
355 return ''.join(output)
356
357url_regex = (
358 re.compile(r'https?://(?:[^/\s.]+\.)+[a-z0-9-]+(?::\d{1,5})?(?:/[^\s\]>)}]+)?', re.I),
359)
360other_regexes = (
361 (lambda x: '', (re.compile(r"""https?://(?:www\.)?(?:twitter|x)\.com/""", re.I),)), # skip twitter
362 (lambda x: '', (re.compile(r"""https?://(?:www\.)?reddit\.com/""", re.I),)), # skip new-reddit
363 (lambda x: '', (re.compile(r"""https?://jfr\.im/git/""", re.I),)), # skip my gitweb
364 (lambda x: '', (re.compile(r"""https?://(?:www\.)?wunderground\.com/""", re.I),)), # skip wunderground, they time us out
365)
366regexes = other_regexes + (
367 (goturl, url_regex),
368)