1 # Erebus IRC bot - Author: Conny Sjoblom
2 # vim: fileencoding=utf-8
4 # This file is released into the public domain; see http://unlicense.org/
8 'author': 'Erebus Team',
9 'license': 'public domain',
15 # http://embed.ly/tools/generator
19 lib
= modlib
.modlib(__name__
)
20 modstart
= lib
.modstart
25 if sys
.version_info
.major
< 3:
26 stringbase
= basestring
30 html
= HTMLParser
.HTMLParser()
31 from BeautifulSoup
import BeautifulSoup
34 import urllib
.request
as urllib2
35 import urllib
.parse
as urlparse
37 from bs4
import BeautifulSoup
39 import re
, json
, datetime
43 aia_session
= aia
.AIASession()
44 # aia is broken on capath systems, needs cafile to work
45 aia_session
._context
.load_verify_locations(cafile
='/etc/ssl/certs/ca-certificates.crt')
46 aia_session
._trusted
= {
47 aia
.openssl_get_cert_info(ca_der
)["subject"]: ca_der
48 for ca_der
in aia_session
._context
.get_ca_certs(True)
51 except ImportError as e
:
55 hostmask_regex
= re
.compile(r
'^(.*)!(.*)@(.*)$')
57 def parser_hostmask(hostmask
):
58 if isinstance(hostmask
, dict):
65 if hostmask
is not None:
66 match
= hostmask_regex
.match(hostmask
)
81 class SmartRedirectHandler(urllib2
.HTTPRedirectHandler
):
82 def http_error_301(self
, req
, fp
, code
, msg
, headers
):
83 result
= urllib2
.HTTPRedirectHandler
.http_error_301(
84 self
, req
, fp
, code
, msg
, headers
)
88 def http_error_302(self
, req
, fp
, code
, msg
, headers
):
89 result
= urllib2
.HTTPRedirectHandler
.http_error_302(
90 self
, req
, fp
, code
, msg
, headers
)
94 def _get_blocked_chans():
95 return lib
.parent
.cfg
.get('urls', 'blocked', '').split(',')
97 def process_line(line
):
100 limit
= lib
.parent
.cfg
.getint('urls', 'limit', 2)
101 for action
, group
in regexes
:
103 for match
in regex
.findall(line
):
106 if num_found
> limit
:
108 if isinstance(match
, stringbase
):
111 resp
= action(*match
)
112 if resp
is not None and resp
!= "":
113 responses
.append(resp
)
116 @lib.hooknum("PRIVMSG")
117 def privmsg_hook(bot
, textline
):
118 user
= parser_hostmask(textline
[1:textline
.find(' ')])
119 chan
= textline
.split()[2]
121 if chan
in _get_blocked_chans(): return
124 line
= textline
.split(None, 3)[3][1:]
128 responses
= process_line(line
)
129 send_response(bot
, chan
, responses
)
131 def send_response(bot
, chan
, responses
):
132 if len(responses
) > 0:
133 if lib
.parent
.cfg
.getboolean('urls', 'multiline'):
135 bot
.msg(chan
, r
, True)
137 bot
.msg(chan
, ' | '.join(responses
), True)
140 return re
.sub('\s+', ' ', html
.unescape(line
))
142 def gotspotify(type, track
):
143 url
= 'http://ws.spotify.com/lookup/1/?uri=spotify:%s:%s' % (type, track
)
144 xml
= urllib2
.urlopen(url
).read()
145 soup
= BeautifulSoup(xml
, convertEntities
=BeautifulSoup
.HTML_ENTITIES
)
146 lookup_type
= soup
.contents
[2].name
148 if lookup_type
== 'track':
149 name
= soup
.find('name').string
150 album_name
= soup
.find('album').find('name').string
151 artist_name
= soup
.find('artist').find('name').string
152 popularity
= soup
.find('popularity')
154 popularity
= float(popularity
.string
)*100
155 length
= float(soup
.find('length').string
)
156 minutes
= int(length
)/60
157 seconds
= int(length
)%60
159 return unescape('Track: %s - %s / %s %s:%.2d %2d%%' % (artist_name
, name
, album_name
, minutes
, seconds
, popularity
))
161 elif lookup_type
== 'album':
162 album_name
= soup
.find('album').find('name').string
163 artist_name
= soup
.find('artist').find('name').string
164 released
= soup
.find('released').string
165 return unescape('Album: %s - %s - %s' % (artist_name
, album_name
, released
))
168 return 'Unsupported type.'
171 mo
= re
.match(r
'P(\d+D)?T(\d+H)?(\d+M)?(\d+S)?', s
)
172 pcs
= [x
for x
in mo
.groups() if x
]
173 return ''.join(pcs
).lower()
175 mo
= re
.match(r
'(\d{4})-(\d{2})-(\d{2})T(\d{2}):(\d{2}):(\d{2})\.(\d+)Z', s
)
176 return datetime
.datetime(*(int(x
) for x
in mo
.groups())).strftime(f
)
180 return '%.1ft' % (n
/10**12)
182 return '%.1fb' % (n
/10**9)
184 return '%.1fm' % (n
/10**6)
186 return '%.1fk' % (n
/10**3)
191 url_data
= urlparse
.urlparse(url
)
192 query
= urlparse
.parse_qs(url_data
.query
)
193 video
= query
["v"][0]
194 api_url
= 'https://www.googleapis.com/youtube/v3/videos?part=snippet,contentDetails,statistics&id=%s&key=%s' % (video
, lib
.parent
.cfg
.get('urls', 'api_key'))
196 respdata
= urllib2
.urlopen(api_url
).read()
197 v
= json
.loads(respdata
)
200 return unescape(lib
.parent
.cfg
.get('urls', 'yt_format', "\002%(author)s\002: \037%(title)s\037 [%(duration)s, uploaded %(uploaded)s, %(views)s v/%(likes)s l/%(dislikes)s d]") % {
201 'title': v
['snippet']['title'],
202 'author': v
['snippet']['channelTitle'],
203 'duration': _yt_duration(v
['contentDetails']['duration']),
204 'uploaded': _yt_date(v
['snippet']['publishedAt'], lib
.parent
.cfg
.get('urls', 'yt_date_format', '%b %d %Y')),
205 'views': _yt_round(v
['statistics']['viewCount']),
206 'likes': _yt_round(v
['statistics']['likeCount']),
207 'dislikes': _yt_round(v
['statistics']['dislikeCount']),
209 except urllib2
.HTTPError
as e
:
210 if e
.getcode() == 403:
211 return 'API limit exceeded'
216 except Exception as e
:
220 url
= 'https://api.twitch.tv/helix/streams?user_login=%s' % uri
.split('/')[0]
221 opener
= urllib2
.build_opener()
222 opener
.addheaders
= [('Client-ID', lib
.parent
.cfg
.get('urls', 'twitch_api_key'))]
223 respdata
= opener
.open(url
).read()
224 twitch
= json
.loads(respdata
)['data']
226 # TODO: add current game.
227 return unescape('\037%s\037 is %s (%s)' % (twitch
[0]['user_name'], twitch
[0]['type'], twitch
[0]['title']))
229 return 'Channel offline.'
231 def _humanize_bytes(b
):
234 table
= " kMGTPEZYRQ"
241 return "%.2f%siB" % (b
, table
[i
])
243 def _do_request(url
, try_aia
=False):
244 """Returns the HTTPResponse object, or a string on error"""
245 request
= urllib2
.Request(url
, headers
={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36', 'Sec-Ch-Ua': '"Chromium";v="116", "Not)A;Brand";v="24", "Google Chrome";v="116"', 'Sec-Ch-Ua-Mobile': '?0', 'Sec-Ch-Ua-Platform': '"Linux"', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'same-origin', 'Sec-Fetch-User': '?1', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'Accept-Language': 'en-US,en;q=0.9', 'Cache-Control': 'no-cache', 'Pragma': 'no-cache', 'Upgrade-Insecure-Requests': '1'}
)
247 opener
= urllib2
.build_opener(urllib2
.HTTPSHandler(context
=aia_session
.ssl_context_from_url(url
)), SmartRedirectHandler())
249 opener
= urllib2
.build_opener(SmartRedirectHandler())
251 # Send request and handle errors
253 response
= opener
.open(request
, timeout
=2)
254 except urllib2
.HTTPError
as e
:
255 return 'Request error: %s %s' % (e
.code
, e
.reason
)
256 except urllib2
.URLError
as e
:
257 if "certificate verify failed: unable to get local issuer certificate" in str(e
.reason
):
258 if aia
: # Retry with AIA enabled
259 return _do_request(url
, True)
261 lib
.parent
.log('urls', '?', 'If the site is not serving the certificate chain, installing the aia library might make this request work: pip install aia')
262 return 'Request error: site may have broken TLS configuration (%s)' % (e
.reason
)
264 return 'Request error: %s' % (e
.reason
)
265 except TimeoutError
as e
:
266 return 'Request error: request timed out'
267 except Exception as e
:
268 return 'Unknown error: %s %r' % (type(e
).__name
__, e
.args
)
275 for _
, group
in other_regexes
:
280 response
= _do_request(url
)
281 if isinstance(response
, stringbase
):
284 # Try to add type and length headers to reply
285 c_type
= response
.getheader('Content-Type', '').split(';', 1)[0]
286 c_len
= response
.getheader('Content-Length')
288 output
.append("[%s] " % (c_type
))
290 output
.append("[no type] ")
291 if c_type
!= "text/html": # else length will be provided by HTML code below
292 if c_len
is not None:
293 output
.append("[%s] " % (_humanize_bytes(c_len
)))
295 output
.append("[no length] ")
297 # Try to add title if HTML
298 if c_type
== 'text/html':
300 responsebody
= response
.read(1024*1024)
301 except Exception as e
:
302 output
.append('Error reading response body: %s %r' % (type(e
).__name
__, e
.args
))
304 if c_len
is not None and len(responsebody
) != int(c_len
):
305 output
.append("[actual %s; Content-Length %s] " % (_humanize_bytes(len(responsebody
)), _humanize_bytes(c_len
)))
307 output
.append("[%s] " % (_humanize_bytes(len(responsebody
))))
309 soup
= BeautifulSoup(responsebody
)
311 output
.append('Title: ' + unescape('%s' % (soup
.find('title').string
.strip())))
313 output
.append('No title')
314 except Exception as e
:
315 output
.append('Title error: %s %r ' % (type(e
).__name
__, e
.args
))
317 return ''.join(output
)
320 re
.compile(r
'https?://(?:[^/\s.]+\.)+[^/\s.]+(?:/\S+)?'),
323 (lambda x
: '', (re
.compile(r
"""https?://(?:www\.)?(?:twitter|x)\.com/""", re
.I
),)), # skip twitter
324 (lambda x
: '', (re
.compile(r
"""https?://(?:www\.)?reddit\.com/""", re
.I
),)), # skip new-reddit
326 regexes
= other_regexes
+ (