]> jfr.im git - erebus.git/blob - modules/urls.py
urls - skip new reddit since titles are useless on it
[erebus.git] / modules / urls.py
1 # Erebus IRC bot - Author: Conny Sjoblom
2 # vim: fileencoding=utf-8
3 # URL Checker
4 # This file is released into the public domain; see http://unlicense.org/
5
6 # module info
7 modinfo = {
8 'author': 'Erebus Team',
9 'license': 'public domain',
10 'compatible': [0],
11 'depends': [],
12 'softdeps': [],
13 }
14
15 # http://embed.ly/tools/generator
16
17 # preamble
18 import modlib
19 lib = modlib.modlib(__name__)
20 modstart = lib.modstart
21 modstop = lib.modstop
22
23 # module code
24 import sys
25 if sys.version_info.major < 3:
26 stringbase = basestring
27 import urllib2
28 import urlparse
29 import HTMLParser
30 html = HTMLParser.HTMLParser()
31 from BeautifulSoup import BeautifulSoup
32 else:
33 stringbase = str
34 import urllib.request as urllib2
35 import urllib.parse as urlparse
36 import html
37 from bs4 import BeautifulSoup
38
39 import re, json, datetime
40
41 try:
42 import aia
43 aia_session = aia.AIASession()
44 # aia is broken on capath systems, needs cafile to work
45 aia_session._context.load_verify_locations(cafile='/etc/ssl/certs/ca-certificates.crt')
46 aia_session._trusted = {
47 aia.openssl_get_cert_info(ca_der)["subject"]: ca_der
48 for ca_der in aia_session._context.get_ca_certs(True)
49 }
50 print("aia loaded")
51 except ImportError as e:
52 print(repr(e))
53 aia = None
54
55 hostmask_regex = re.compile(r'^(.*)!(.*)@(.*)$')
56
57 def parser_hostmask(hostmask):
58 if isinstance(hostmask, dict):
59 return hostmask
60
61 nick = None
62 user = None
63 host = None
64
65 if hostmask is not None:
66 match = hostmask_regex.match(hostmask)
67
68 if not match:
69 nick = hostmask
70 else:
71 nick = match.group(1)
72 user = match.group(2)
73 host = match.group(3)
74
75 return {
76 'nick': nick,
77 'user': user,
78 'host': host
79 }
80
81 class SmartRedirectHandler(urllib2.HTTPRedirectHandler):
82 def http_error_301(self, req, fp, code, msg, headers):
83 result = urllib2.HTTPRedirectHandler.http_error_301(
84 self, req, fp, code, msg, headers)
85 result.status = code
86 return result
87
88 def http_error_302(self, req, fp, code, msg, headers):
89 result = urllib2.HTTPRedirectHandler.http_error_302(
90 self, req, fp, code, msg, headers)
91 result.status = code
92 return result
93
94 def _get_blocked_chans():
95 return lib.parent.cfg.get('urls', 'blocked', '').split(',')
96
97 def process_line(line):
98 responses = []
99 num_found = 0
100 limit = lib.parent.cfg.getint('urls', 'limit', 2)
101 for action, group in regexes:
102 for regex in group:
103 for match in regex.findall(line):
104 if match:
105 num_found += 1
106 if num_found > limit:
107 return responses
108 if isinstance(match, stringbase):
109 resp = action(match)
110 else:
111 resp = action(*match)
112 if resp is not None and resp != "":
113 responses.append(resp)
114 return responses
115
116 @lib.hooknum("PRIVMSG")
117 def privmsg_hook(bot, textline):
118 user = parser_hostmask(textline[1:textline.find(' ')])
119 chan = textline.split()[2]
120
121 if chan in _get_blocked_chans(): return
122
123 try:
124 line = textline.split(None, 3)[3][1:]
125 except IndexError:
126 line = ''
127
128 responses = process_line(line)
129 if len(responses) > 0:
130 if lib.parent.cfg.getboolean('urls', 'multiline'):
131 for r in responses:
132 bot.msg(chan, r, True)
133 else:
134 bot.msg(chan, ' | '.join(responses), True)
135
136 def unescape(line):
137 return re.sub('\s+', ' ', html.unescape(line))
138
139 def gotspotify(type, track):
140 url = 'http://ws.spotify.com/lookup/1/?uri=spotify:%s:%s' % (type, track)
141 xml = urllib2.urlopen(url).read()
142 soup = BeautifulSoup(xml, convertEntities=BeautifulSoup.HTML_ENTITIES)
143 lookup_type = soup.contents[2].name
144
145 if lookup_type == 'track':
146 name = soup.find('name').string
147 album_name = soup.find('album').find('name').string
148 artist_name = soup.find('artist').find('name').string
149 popularity = soup.find('popularity')
150 if popularity:
151 popularity = float(popularity.string)*100
152 length = float(soup.find('length').string)
153 minutes = int(length)/60
154 seconds = int(length)%60
155
156 return unescape('Track: %s - %s / %s %s:%.2d %2d%%' % (artist_name, name, album_name, minutes, seconds, popularity))
157
158 elif lookup_type == 'album':
159 album_name = soup.find('album').find('name').string
160 artist_name = soup.find('artist').find('name').string
161 released = soup.find('released').string
162 return unescape('Album: %s - %s - %s' % (artist_name, album_name, released))
163
164 else:
165 return 'Unsupported type.'
166
167 def _yt_duration(s):
168 mo = re.match(r'P(\d+D)?T(\d+H)?(\d+M)?(\d+S)?', s)
169 pcs = [x for x in mo.groups() if x]
170 return ''.join(pcs).lower()
171 def _yt_date(s, f):
172 mo = re.match(r'(\d{4})-(\d{2})-(\d{2})T(\d{2}):(\d{2}):(\d{2})\.(\d+)Z', s)
173 return datetime.datetime(*(int(x) for x in mo.groups())).strftime(f)
174 def _yt_round(n):
175 n = float(n)
176 if n >= 10**12:
177 return '%.1ft' % (n/10**12)
178 elif n >= 10**9:
179 return '%.1fb' % (n/10**9)
180 elif n >= 10**6:
181 return '%.1fm' % (n/10**6)
182 elif n >= 10**3:
183 return '%.1fk' % (n/10**3)
184 else:
185 return int(n)
186
187 def gotyoutube(url):
188 url_data = urlparse.urlparse(url)
189 query = urlparse.parse_qs(url_data.query)
190 video = query["v"][0]
191 api_url = 'https://www.googleapis.com/youtube/v3/videos?part=snippet,contentDetails,statistics&id=%s&key=%s' % (video, lib.parent.cfg.get('urls', 'api_key'))
192 try:
193 respdata = urllib2.urlopen(api_url).read()
194 v = json.loads(respdata)
195 v = v['items'][0]
196
197 return unescape(lib.parent.cfg.get('urls', 'yt_format', "\002%(author)s\002: \037%(title)s\037 [%(duration)s, uploaded %(uploaded)s, %(views)s v/%(likes)s l/%(dislikes)s d]") % {
198 'title': v['snippet']['title'],
199 'author': v['snippet']['channelTitle'],
200 'duration': _yt_duration(v['contentDetails']['duration']),
201 'uploaded': _yt_date(v['snippet']['publishedAt'], lib.parent.cfg.get('urls', 'yt_date_format', '%b %d %Y')),
202 'views': _yt_round(v['statistics']['viewCount']),
203 'likes': _yt_round(v['statistics']['likeCount']),
204 'dislikes': _yt_round(v['statistics']['dislikeCount']),
205 })
206 except urllib2.HTTPError as e:
207 if e.getcode() == 403:
208 return 'API limit exceeded'
209 else:
210 return str(e)
211 except IndexError:
212 return 'no results'
213 except Exception as e:
214 return str(e)
215
216 def gottwitch(uri):
217 url = 'https://api.twitch.tv/helix/streams?user_login=%s' % uri.split('/')[0]
218 opener = urllib2.build_opener()
219 opener.addheaders = [('Client-ID', lib.parent.cfg.get('urls', 'twitch_api_key'))]
220 respdata = opener.open(url).read()
221 twitch = json.loads(respdata)['data']
222 try:
223 # TODO: add current game.
224 return unescape('\037%s\037 is %s (%s)' % (twitch[0]['user_name'], twitch[0]['type'], twitch[0]['title']))
225 except:
226 return 'Channel offline.'
227
228 def _humanize_bytes(b):
229 b = int(b)
230 i = 0
231 table = " kMGTPEZYRQ"
232 while b > 1024:
233 i += 1
234 b /= 1024.0
235 if i == 0:
236 return "%dB" % (b)
237 else:
238 return "%.2f%siB" % (b, table[i])
239
240 def _do_request(url, try_aia=False):
241 """Returns the HTTPResponse object, or a string on error"""
242 request = urllib2.Request(url, headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36', 'Sec-Ch-Ua': '"Chromium";v="116", "Not)A;Brand";v="24", "Google Chrome";v="116"', 'Sec-Ch-Ua-Mobile': '?0', 'Sec-Ch-Ua-Platform': '"Linux"', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'same-origin', 'Sec-Fetch-User': '?1', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'Accept-Language': 'en-US,en;q=0.9', 'Cache-Control': 'no-cache', 'Pragma': 'no-cache', 'Upgrade-Insecure-Requests': '1'})
243 if try_aia:
244 opener = urllib2.build_opener(urllib2.HTTPSHandler(context=aia_session.ssl_context_from_url(url)), SmartRedirectHandler())
245 else:
246 opener = urllib2.build_opener(SmartRedirectHandler())
247
248 # Send request and handle errors
249 try:
250 response = opener.open(request, timeout=2)
251 except urllib2.HTTPError as e:
252 return 'Request error: %s %s' % (e.code, e.reason)
253 except urllib2.URLError as e:
254 if "certificate verify failed: unable to get local issuer certificate" in str(e.reason):
255 if aia: # Retry with AIA enabled
256 return _do_request(url, True)
257 else:
258 lib.parent.log('urls', '?', 'If the site is not serving the certificate chain, installing the aia library might make this request work: pip install aia')
259 return 'Request error: site may have broken TLS configuration (%s)' % (e.reason)
260 else:
261 return 'Request error: %s' % (e.reason)
262 except TimeoutError as e:
263 return 'Request error: request timed out'
264 except Exception as e:
265 return 'Unknown error: %s %r' % (type(e).__name__, e.args)
266
267 return response
268
269
270 def goturl(url):
271 output = []
272 for _, group in other_regexes:
273 for regex in group:
274 if regex.match(url):
275 return None
276
277 response = _do_request(url)
278 if isinstance(response, stringbase):
279 return response
280
281 # Try to add type and length headers to reply
282 c_type = response.getheader('Content-Type', '').split(';', 1)[0]
283 c_len = response.getheader('Content-Length')
284 if c_type != '':
285 output.append("[%s] " % (c_type))
286 else:
287 output.append("[no type] ")
288 if c_type != "text/html": # else length will be provided by HTML code below
289 if c_len is not None:
290 output.append("[%s] " % (_humanize_bytes(c_len)))
291 else:
292 output.append("[no length] ")
293
294 # Try to add title if HTML
295 if c_type == 'text/html':
296 try:
297 responsebody = response.read(1024*1024)
298 except Exception as e:
299 output.append('Error reading response body: %s %r' % (type(e).__name__, e.args))
300 else:
301 if c_len is not None and len(responsebody) != int(c_len):
302 output.append("[actual %s; Content-Length %s] " % (_humanize_bytes(len(responsebody)), _humanize_bytes(c_len)))
303 else:
304 output.append("[%s] " % (_humanize_bytes(len(responsebody))))
305 try:
306 soup = BeautifulSoup(responsebody)
307 if soup.title:
308 output.append('Title: ' + unescape('%s' % (soup.find('title').string.strip())))
309 else:
310 output.append('No title')
311 except Exception as e:
312 output.append('Title error: %s %r ' % (type(e).__name__, e.args))
313
314 return ''.join(output)
315
316 url_regex = (
317 re.compile(r'https?://(?:[^/\s.]+\.)+[^/\s.]+(?:/\S+)?'),
318 )
319 other_regexes = (
320 (lambda x: '', (re.compile(r"""https?://(?:www\.)?(?:twitter|x)\.com/""", re.I),)), # skip twitter
321 (lambda x: '', (re.compile(r"""https?://(?:www\.)?reddit\.com/""", re.I),)), # skip new-reddit
322 )
323 regexes = other_regexes + (
324 (goturl, url_regex),
325 )