]> jfr.im git - erebus.git/blob - modules/urls.py
urls - blacklist my gitweb, lol
[erebus.git] / modules / urls.py
1 # Erebus IRC bot - Author: Conny Sjoblom
2 # vim: fileencoding=utf-8
3 # URL Checker
4 # This file is released into the public domain; see http://unlicense.org/
5
6 # module info
7 modinfo = {
8 'author': 'Erebus Team',
9 'license': 'public domain',
10 'compatible': [0],
11 'depends': [],
12 'softdeps': [],
13 }
14
15 # http://embed.ly/tools/generator
16
17 # preamble
18 import modlib
19 lib = modlib.modlib(__name__)
20 modstart = lib.modstart
21 modstop = lib.modstop
22
23 # module code
24 import sys
25 if sys.version_info.major < 3:
26 stringbase = basestring
27 import urllib2
28 import urlparse
29 import HTMLParser
30 html = HTMLParser.HTMLParser()
31 from BeautifulSoup import BeautifulSoup
32 else:
33 stringbase = str
34 import urllib.request as urllib2
35 import urllib.parse as urlparse
36 import html
37 from bs4 import BeautifulSoup
38
39 import re, json, datetime
40
41 try:
42 import aia
43 aia_session = aia.AIASession()
44 # aia is broken on capath systems, needs cafile to work
45 aia_session._context.load_verify_locations(cafile='/etc/ssl/certs/ca-certificates.crt')
46 aia_session._trusted = {
47 aia.openssl_get_cert_info(ca_der)["subject"]: ca_der
48 for ca_der in aia_session._context.get_ca_certs(True)
49 }
50 print("aia loaded")
51 except ImportError as e:
52 print(repr(e))
53 aia = None
54
55 hostmask_regex = re.compile(r'^(.*)!(.*)@(.*)$')
56
57 def parser_hostmask(hostmask):
58 if isinstance(hostmask, dict):
59 return hostmask
60
61 nick = None
62 user = None
63 host = None
64
65 if hostmask is not None:
66 match = hostmask_regex.match(hostmask)
67
68 if not match:
69 nick = hostmask
70 else:
71 nick = match.group(1)
72 user = match.group(2)
73 host = match.group(3)
74
75 return {
76 'nick': nick,
77 'user': user,
78 'host': host
79 }
80
81 class SmartRedirectHandler(urllib2.HTTPRedirectHandler):
82 def http_error_301(self, req, fp, code, msg, headers):
83 result = urllib2.HTTPRedirectHandler.http_error_301(
84 self, req, fp, code, msg, headers)
85 result.status = code
86 return result
87
88 def http_error_302(self, req, fp, code, msg, headers):
89 result = urllib2.HTTPRedirectHandler.http_error_302(
90 self, req, fp, code, msg, headers)
91 result.status = code
92 return result
93
94 def _get_blocked_chans():
95 return lib.parent.cfg.get('urls', 'blocked', '').split(',')
96
97 def process_line(line):
98 responses = []
99 num_found = 0
100 limit = lib.parent.cfg.getint('urls', 'limit', 2)
101 for action, group in regexes:
102 for regex in group:
103 for match in regex.findall(line):
104 if match:
105 num_found += 1
106 if num_found > limit:
107 return responses
108 if isinstance(match, stringbase):
109 resp = action(match)
110 else:
111 resp = action(*match)
112 if resp is not None and resp != "":
113 responses.append(resp)
114 return responses
115
116 @lib.hooknum("PRIVMSG")
117 def privmsg_hook(bot, textline):
118 user = parser_hostmask(textline[1:textline.find(' ')])
119 chan = textline.split()[2]
120
121 if chan in _get_blocked_chans(): return
122
123 try:
124 line = textline.split(None, 3)[3][1:]
125 except IndexError:
126 line = ''
127
128 responses = process_line(line)
129 send_response(bot, chan, responses)
130
131 def send_response(bot, chan, responses):
132 if len(responses) > 0:
133 if lib.parent.cfg.getboolean('urls', 'multiline'):
134 for r in responses:
135 bot.msg(chan, r, True)
136 else:
137 bot.msg(chan, ' | '.join(responses), True)
138
139 def unescape(line):
140 return re.sub('\s+', ' ', html.unescape(line))
141
142 def gotspotify(type, track):
143 url = 'http://ws.spotify.com/lookup/1/?uri=spotify:%s:%s' % (type, track)
144 xml = urllib2.urlopen(url).read()
145 soup = BeautifulSoup(xml, convertEntities=BeautifulSoup.HTML_ENTITIES)
146 lookup_type = soup.contents[2].name
147
148 if lookup_type == 'track':
149 name = soup.find('name').string
150 album_name = soup.find('album').find('name').string
151 artist_name = soup.find('artist').find('name').string
152 popularity = soup.find('popularity')
153 if popularity:
154 popularity = float(popularity.string)*100
155 length = float(soup.find('length').string)
156 minutes = int(length)/60
157 seconds = int(length)%60
158
159 return unescape('Track: %s - %s / %s %s:%.2d %2d%%' % (artist_name, name, album_name, minutes, seconds, popularity))
160
161 elif lookup_type == 'album':
162 album_name = soup.find('album').find('name').string
163 artist_name = soup.find('artist').find('name').string
164 released = soup.find('released').string
165 return unescape('Album: %s - %s - %s' % (artist_name, album_name, released))
166
167 else:
168 return 'Unsupported type.'
169
170 def _yt_duration(s):
171 mo = re.match(r'P(\d+D)?T(\d+H)?(\d+M)?(\d+S)?', s)
172 pcs = [x for x in mo.groups() if x]
173 return ''.join(pcs).lower()
174 def _yt_date(s, f):
175 mo = re.match(r'(\d{4})-(\d{2})-(\d{2})T(\d{2}):(\d{2}):(\d{2})\.(\d+)Z', s)
176 return datetime.datetime(*(int(x) for x in mo.groups())).strftime(f)
177 def _yt_round(n):
178 n = float(n)
179 if n >= 10**12:
180 return '%.1ft' % (n/10**12)
181 elif n >= 10**9:
182 return '%.1fb' % (n/10**9)
183 elif n >= 10**6:
184 return '%.1fm' % (n/10**6)
185 elif n >= 10**3:
186 return '%.1fk' % (n/10**3)
187 else:
188 return int(n)
189
190 def gotyoutube(url):
191 url_data = urlparse.urlparse(url)
192 query = urlparse.parse_qs(url_data.query)
193 video = query["v"][0]
194 api_url = 'https://www.googleapis.com/youtube/v3/videos?part=snippet,contentDetails,statistics&id=%s&key=%s' % (video, lib.parent.cfg.get('urls', 'api_key'))
195 try:
196 respdata = urllib2.urlopen(api_url).read()
197 v = json.loads(respdata)
198 v = v['items'][0]
199
200 return unescape(lib.parent.cfg.get('urls', 'yt_format', "\002%(author)s\002: \037%(title)s\037 [%(duration)s, uploaded %(uploaded)s, %(views)s v/%(likes)s l/%(dislikes)s d]") % {
201 'title': v['snippet']['title'],
202 'author': v['snippet']['channelTitle'],
203 'duration': _yt_duration(v['contentDetails']['duration']),
204 'uploaded': _yt_date(v['snippet']['publishedAt'], lib.parent.cfg.get('urls', 'yt_date_format', '%b %d %Y')),
205 'views': _yt_round(v['statistics']['viewCount']),
206 'likes': _yt_round(v['statistics']['likeCount']),
207 'dislikes': _yt_round(v['statistics']['dislikeCount']),
208 })
209 except urllib2.HTTPError as e:
210 if e.getcode() == 403:
211 return 'API limit exceeded'
212 else:
213 return str(e)
214 except IndexError:
215 return 'no results'
216 except Exception as e:
217 return str(e)
218
219 def gottwitch(uri):
220 url = 'https://api.twitch.tv/helix/streams?user_login=%s' % uri.split('/')[0]
221 opener = urllib2.build_opener()
222 opener.addheaders = [('Client-ID', lib.parent.cfg.get('urls', 'twitch_api_key'))]
223 respdata = opener.open(url).read()
224 twitch = json.loads(respdata)['data']
225 try:
226 # TODO: add current game.
227 return unescape('\037%s\037 is %s (%s)' % (twitch[0]['user_name'], twitch[0]['type'], twitch[0]['title']))
228 except:
229 return 'Channel offline.'
230
231 def _humanize_bytes(b):
232 b = int(b)
233 i = 0
234 table = " kMGTPEZYRQ"
235 while b > 1024:
236 i += 1
237 b /= 1024.0
238 if i == 0:
239 return "%dB" % (b)
240 else:
241 return "%.2f%siB" % (b, table[i])
242
243 def _do_request(url, try_aia=False):
244 """Returns the HTTPResponse object, or a string on error"""
245 request = urllib2.Request(url, headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36', 'Sec-Ch-Ua': '"Chromium";v="116", "Not)A;Brand";v="24", "Google Chrome";v="116"', 'Sec-Ch-Ua-Mobile': '?0', 'Sec-Ch-Ua-Platform': '"Linux"', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'same-origin', 'Sec-Fetch-User': '?1', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'Accept-Language': 'en-US,en;q=0.9', 'Cache-Control': 'no-cache', 'Pragma': 'no-cache', 'Upgrade-Insecure-Requests': '1'})
246 if try_aia:
247 opener = urllib2.build_opener(urllib2.HTTPSHandler(context=aia_session.ssl_context_from_url(url)), SmartRedirectHandler())
248 else:
249 opener = urllib2.build_opener(SmartRedirectHandler())
250
251 # Send request and handle errors
252 try:
253 response = opener.open(request, timeout=2)
254 except urllib2.HTTPError as e:
255 return 'Request error: %s %s' % (e.code, e.reason)
256 except urllib2.URLError as e:
257 if "certificate verify failed: unable to get local issuer certificate" in str(e.reason):
258 if aia: # Retry with AIA enabled
259 return _do_request(url, True)
260 else:
261 lib.parent.log('urls', '?', 'If the site is not serving the certificate chain, installing the aia library might make this request work: pip install aia')
262 return 'Request error: site may have broken TLS configuration (%s)' % (e.reason)
263 else:
264 return 'Request error: %s' % (e.reason)
265 except TimeoutError as e:
266 return 'Request error: request timed out'
267 except Exception as e:
268 return 'Unknown error: %s %r' % (type(e).__name__, e.args)
269
270 return response
271
272
273 def goturl(url):
274 output = []
275 for _, group in other_regexes:
276 for regex in group:
277 if regex.match(url):
278 return None
279
280 response = _do_request(url)
281 if isinstance(response, stringbase):
282 return response
283
284 # Try to add type and length headers to reply
285 c_type = response.getheader('Content-Type', '').split(';', 1)[0]
286 c_len = response.getheader('Content-Length')
287 if c_type != '':
288 output.append("[%s] " % (c_type))
289 else:
290 output.append("[no type] ")
291 if c_type != "text/html": # else length will be provided by HTML code below
292 if c_len is not None:
293 output.append("[%s] " % (_humanize_bytes(c_len)))
294 else:
295 output.append("[no length] ")
296
297 # Try to add title if HTML
298 if c_type == 'text/html':
299 try:
300 responsebody = response.read(1024*1024)
301 except Exception as e:
302 output.append('Error reading response body: %s %r' % (type(e).__name__, e.args))
303 else:
304 if c_len is not None and len(responsebody) != int(c_len):
305 output.append("[actual %s; Content-Length %s] " % (_humanize_bytes(len(responsebody)), _humanize_bytes(c_len)))
306 else:
307 output.append("[%s] " % (_humanize_bytes(len(responsebody))))
308 try:
309 soup = BeautifulSoup(responsebody)
310 if soup.title:
311 output.append('Title: ' + unescape('%s' % (soup.find('title').string.strip())))
312 else:
313 output.append('No title')
314 except Exception as e:
315 output.append('Title error: %s %r ' % (type(e).__name__, e.args))
316
317 return ''.join(output)
318
319 url_regex = (
320 re.compile(r'https?://(?:[^/\s.]+\.)+[^/\s.]+(?:/\S+)?'),
321 )
322 other_regexes = (
323 (lambda x: '', (re.compile(r"""https?://(?:www\.)?(?:twitter|x)\.com/""", re.I),)), # skip twitter
324 (lambda x: '', (re.compile(r"""https?://(?:www\.)?reddit\.com/""", re.I),)), # skip new-reddit
325 (lambda x: '', (re.compile(r"""https?://jfr\.im/git/""", re.I),)), # skip my gitweb
326 )
327 regexes = other_regexes + (
328 (goturl, url_regex),
329 )