]> jfr.im git - erebus.git/blob - modules/urls.py
remove a debugging print
[erebus.git] / modules / urls.py
1 # Erebus IRC bot - Author: Conny Sjoblom
2 # vim: fileencoding=utf-8
3 # URL Checker
4 # This file is released into the public domain; see http://unlicense.org/
5
6 # module info
7 modinfo = {
8 'author': 'Erebus Team',
9 'license': 'public domain',
10 'compatible': [0],
11 'depends': [],
12 'softdeps': [],
13 }
14
15 # http://embed.ly/tools/generator
16
17 # preamble
18 import modlib
19 lib = modlib.modlib(__name__)
20 modstart = lib.modstart
21 modstop = lib.modstop
22
23 # module code
24 import sys
25 if sys.version_info.major < 3:
26 stringbase = basestring
27 import urllib2
28 import urlparse
29 import HTMLParser
30 html = HTMLParser.HTMLParser()
31 from BeautifulSoup import BeautifulSoup
32 else:
33 stringbase = str
34 import urllib.request as urllib2
35 import urllib.parse as urlparse
36 import html
37 from bs4 import BeautifulSoup
38
39 import re, json, datetime
40
41 hostmask_regex = re.compile(r'^(.*)!(.*)@(.*)$')
42
43 def parser_hostmask(hostmask):
44 if isinstance(hostmask, dict):
45 return hostmask
46
47 nick = None
48 user = None
49 host = None
50
51 if hostmask is not None:
52 match = hostmask_regex.match(hostmask)
53
54 if not match:
55 nick = hostmask
56 else:
57 nick = match.group(1)
58 user = match.group(2)
59 host = match.group(3)
60
61 return {
62 'nick': nick,
63 'user': user,
64 'host': host
65 }
66
67 class SmartRedirectHandler(urllib2.HTTPRedirectHandler):
68 def http_error_301(self, req, fp, code, msg, headers):
69 result = urllib2.HTTPRedirectHandler.http_error_301(
70 self, req, fp, code, msg, headers)
71 result.status = code
72 return result
73
74 def http_error_302(self, req, fp, code, msg, headers):
75 result = urllib2.HTTPRedirectHandler.http_error_302(
76 self, req, fp, code, msg, headers)
77 result.status = code
78 return result
79
80 def _get_blocked_chans():
81 return lib.parent.cfg.get('urls', 'blocked', '').split(',')
82
83 def process_line(line):
84 responses = []
85 num_found = 0
86 limit = lib.parent.cfg.getint('urls', 'limit', 2)
87 for action, group in regexes:
88 for regex in group:
89 for match in regex.findall(line):
90 if match:
91 num_found += 1
92 if num_found > limit:
93 return responses
94 if isinstance(match, stringbase):
95 resp = action(match)
96 else:
97 resp = action(*match)
98 if resp is not None and resp != "":
99 responses.append(resp)
100 return responses
101
102 @lib.hooknum("PRIVMSG")
103 def privmsg_hook(bot, textline):
104 user = parser_hostmask(textline[1:textline.find(' ')])
105 chan = textline.split()[2]
106
107 if chan in _get_blocked_chans(): return
108
109 try:
110 line = textline.split(None, 3)[3][1:]
111 except IndexError:
112 line = ''
113
114 responses = process_line(line)
115 if len(responses) > 0:
116 if lib.parent.cfg.getboolean('urls', 'multiline'):
117 for r in responses:
118 bot.msg(chan, r, True)
119 else:
120 bot.msg(chan, ' | '.join(responses), True)
121
122 def unescape(line):
123 return re.sub('\s+', ' ', html.unescape(line))
124
125 def gotspotify(type, track):
126 url = 'http://ws.spotify.com/lookup/1/?uri=spotify:%s:%s' % (type, track)
127 xml = urllib2.urlopen(url).read()
128 soup = BeautifulSoup(xml, convertEntities=BeautifulSoup.HTML_ENTITIES)
129 lookup_type = soup.contents[2].name
130
131 if lookup_type == 'track':
132 name = soup.find('name').string
133 album_name = soup.find('album').find('name').string
134 artist_name = soup.find('artist').find('name').string
135 popularity = soup.find('popularity')
136 if popularity:
137 popularity = float(popularity.string)*100
138 length = float(soup.find('length').string)
139 minutes = int(length)/60
140 seconds = int(length)%60
141
142 return unescape('Track: %s - %s / %s %s:%.2d %2d%%' % (artist_name, name, album_name, minutes, seconds, popularity))
143
144 elif lookup_type == 'album':
145 album_name = soup.find('album').find('name').string
146 artist_name = soup.find('artist').find('name').string
147 released = soup.find('released').string
148 return unescape('Album: %s - %s - %s' % (artist_name, album_name, released))
149
150 else:
151 return 'Unsupported type.'
152
153 def _yt_duration(s):
154 mo = re.match(r'P(\d+D)?T(\d+H)?(\d+M)?(\d+S)?', s)
155 pcs = [x for x in mo.groups() if x]
156 return ''.join(pcs).lower()
157 def _yt_date(s, f):
158 mo = re.match(r'(\d{4})-(\d{2})-(\d{2})T(\d{2}):(\d{2}):(\d{2})\.(\d+)Z', s)
159 return datetime.datetime(*(int(x) for x in mo.groups())).strftime(f)
160 def _yt_round(n):
161 n = float(n)
162 if n >= 10**12:
163 return '%.1ft' % (n/10**12)
164 elif n >= 10**9:
165 return '%.1fb' % (n/10**9)
166 elif n >= 10**6:
167 return '%.1fm' % (n/10**6)
168 elif n >= 10**3:
169 return '%.1fk' % (n/10**3)
170 else:
171 return int(n)
172
173 def gotyoutube(url):
174 url_data = urlparse.urlparse(url)
175 query = urlparse.parse_qs(url_data.query)
176 video = query["v"][0]
177 api_url = 'https://www.googleapis.com/youtube/v3/videos?part=snippet,contentDetails,statistics&id=%s&key=%s' % (video, lib.parent.cfg.get('urls', 'api_key'))
178 try:
179 respdata = urllib2.urlopen(api_url).read()
180 v = json.loads(respdata)
181 v = v['items'][0]
182
183 return unescape(lib.parent.cfg.get('urls', 'yt_format', "\002%(author)s\002: \037%(title)s\037 [%(duration)s, uploaded %(uploaded)s, %(views)s v/%(likes)s l/%(dislikes)s d]") % {
184 'title': v['snippet']['title'],
185 'author': v['snippet']['channelTitle'],
186 'duration': _yt_duration(v['contentDetails']['duration']),
187 'uploaded': _yt_date(v['snippet']['publishedAt'], lib.parent.cfg.get('urls', 'yt_date_format', '%b %d %Y')),
188 'views': _yt_round(v['statistics']['viewCount']),
189 'likes': _yt_round(v['statistics']['likeCount']),
190 'dislikes': _yt_round(v['statistics']['dislikeCount']),
191 })
192 except urllib2.HTTPError as e:
193 if e.getcode() == 403:
194 return 'API limit exceeded'
195 else:
196 return str(e)
197 except IndexError:
198 return 'no results'
199 except Exception as e:
200 return str(e)
201
202 def gottwitch(uri):
203 url = 'https://api.twitch.tv/helix/streams?user_login=%s' % uri.split('/')[0]
204 opener = urllib2.build_opener()
205 opener.addheaders = [('Client-ID', lib.parent.cfg.get('urls', 'twitch_api_key'))]
206 respdata = opener.open(url).read()
207 twitch = json.loads(respdata)['data']
208 try:
209 # TODO: add current game.
210 return unescape('\037%s\037 is %s (%s)' % (twitch[0]['user_name'], twitch[0]['type'], twitch[0]['title']))
211 except:
212 return 'Channel offline.'
213
214 def _humanize_bytes(b):
215 b = int(b)
216 i = 0
217 table = " kMGTPEZYRQ"
218 while b > 1024:
219 i += 1
220 b /= 1024.0
221 if i == 0:
222 return "%dB" % (b)
223 else:
224 return "%.2f%siB" % (b, table[i])
225
226 def goturl(url):
227 output = []
228 for _, group in other_regexes:
229 for regex in group:
230 if regex.match(url):
231 return None
232 request = urllib2.Request(url, headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36'})
233 opener = urllib2.build_opener(SmartRedirectHandler())
234
235 # Send request and handle errors
236 try:
237 response = opener.open(request, timeout=2)
238 except urllib2.HTTPError as e:
239 return 'Request error: %s %s' % (e.code, e.reason)
240 except urllib2.URLError as e:
241 return 'Request error: %s' % (e.reason)
242 except TimeoutError as e:
243 return 'Request error: request timed out'
244 except Exception as e:
245 return 'Unknown error: %s %r' % (type(e).__name__, e.args)
246
247 # Try to add type and length headers to reply
248 c_type = response.getheader('Content-Type', '').split(';', 1)[0]
249 c_len = response.getheader('Content-Length')
250 if c_type != '':
251 output.append("[%s] " % (c_type))
252 else:
253 output.append("[no type] ")
254 if c_type != "text/html": # else length will be provided by HTML code below
255 if c_len is not None:
256 output.append("[%s] " % (_humanize_bytes(c_len)))
257 else:
258 output.append("[no length] ")
259
260 # Try to add title if HTML
261 if c_type == 'text/html':
262 try:
263 responsebody = response.read(1024*1024)
264 except Exception as e:
265 output.append('Error reading response body: %s %r' % (type(e).__name__, e.args))
266 else:
267 if c_len is not None and len(responsebody) != int(c_len):
268 output.append("[actual %s; Content-Length %s] " % (_humanize_bytes(len(responsebody)), _humanize_bytes(c_len)))
269 else:
270 output.append("[%s] " % (_humanize_bytes(len(responsebody))))
271 try:
272 soup = BeautifulSoup(responsebody)
273 if soup.title:
274 output.append('Title: ' + unescape('%s' % (soup.find('title').string.strip())))
275 else:
276 output.append('No title')
277 except Exception as e:
278 output.append('Title error: %s %r ' % (type(e).__name__, e.args))
279
280 return ''.join(output)
281
282 url_regex = (
283 re.compile(r'https?://(?:[^/\s.]+\.)+[^/\s.]+(?:/\S+)?'),
284 )
285 other_regexes = (
286 )
287 regexes = other_regexes + (
288 (goturl, url_regex),
289 )