1 # Erebus IRC bot - Author: Conny Sjoblom
2 # vim: fileencoding=utf-8
4 # This file is released into the public domain; see http://unlicense.org/
8 'author': 'Erebus Team',
9 'license': 'public domain',
15 # http://embed.ly/tools/generator
19 lib
= modlib
.modlib(__name__
)
20 modstart
= lib
.modstart
25 if sys
.version_info
.major
< 3:
26 stringbase
= basestring
30 html
= HTMLParser
.HTMLParser()
31 from BeautifulSoup
import BeautifulSoup
34 import urllib
.request
as urllib2
35 import urllib
.parse
as urlparse
37 from bs4
import BeautifulSoup
40 import re
, json
, datetime
44 aia_session
= aia
.AIASession()
45 # aia is broken on capath systems, needs cafile to work
46 aia_session
._context
.load_verify_locations(cafile
='/etc/ssl/certs/ca-certificates.crt')
47 aia_session
._trusted
= {
48 aia
.openssl_get_cert_info(ca_der
)["subject"]: ca_der
49 for ca_der
in aia_session
._context
.get_ca_certs(True)
52 except ImportError as e
:
56 hostmask_regex
= re
.compile(r
'^(.*)!(.*)@(.*)$')
58 def parser_hostmask(hostmask
):
59 if isinstance(hostmask
, dict):
66 if hostmask
is not None:
67 match
= hostmask_regex
.match(hostmask
)
82 class SmartRedirectHandler(urllib2
.HTTPRedirectHandler
):
83 def http_error_301(self
, req
, fp
, code
, msg
, headers
):
84 result
= urllib2
.HTTPRedirectHandler
.http_error_301(
85 self
, req
, fp
, code
, msg
, headers
)
89 def http_error_302(self
, req
, fp
, code
, msg
, headers
):
90 result
= urllib2
.HTTPRedirectHandler
.http_error_302(
91 self
, req
, fp
, code
, msg
, headers
)
95 def _get_blocked_chans():
96 return lib
.parent
.cfg
.get('urls', 'blocked', '').split(',')
98 def process_line(line
):
101 limit
= lib
.parent
.cfg
.getint('urls', 'limit', 2)
102 for action
, group
in regexes
:
104 for match
in regex
.findall(line
):
107 if num_found
> limit
:
109 if isinstance(match
, stringbase
):
112 resp
= action(*match
)
113 if resp
is not None and resp
!= "":
114 responses
.append(resp
)
117 @lib.hooknum("PRIVMSG")
118 def privmsg_hook(bot
, textline
):
119 user
= parser_hostmask(textline
[1:textline
.find(' ')])
120 chan
= textline
.split()[2]
122 if chan
in _get_blocked_chans(): return
125 line
= textline
.split(None, 3)[3][1:]
129 responses
= process_line(line
)
130 send_response(bot
, chan
, responses
)
132 def send_response(bot
, chan
, responses
):
133 if len(responses
) > 0:
134 if lib
.parent
.cfg
.getboolean('urls', 'multiline'):
136 bot
.msg(chan
, r
, True)
138 bot
.msg(chan
, ' | '.join(responses
), True)
141 return re
.sub('\s+', ' ', html
.unescape(line
))
143 def gotspotify(type, track
):
144 url
= 'http://ws.spotify.com/lookup/1/?uri=spotify:%s:%s' % (type, track
)
145 xml
= urllib2
.urlopen(url
).read()
146 soup
= BeautifulSoup(xml
, convertEntities
=BeautifulSoup
.HTML_ENTITIES
)
147 lookup_type
= soup
.contents
[2].name
149 if lookup_type
== 'track':
150 name
= soup
.find('name').string
151 album_name
= soup
.find('album').find('name').string
152 artist_name
= soup
.find('artist').find('name').string
153 popularity
= soup
.find('popularity')
155 popularity
= float(popularity
.string
)*100
156 length
= float(soup
.find('length').string
)
157 minutes
= int(length
)/60
158 seconds
= int(length
)%60
160 return unescape('Track: %s - %s / %s %s:%.2d %2d%%' % (artist_name
, name
, album_name
, minutes
, seconds
, popularity
))
162 elif lookup_type
== 'album':
163 album_name
= soup
.find('album').find('name').string
164 artist_name
= soup
.find('artist').find('name').string
165 released
= soup
.find('released').string
166 return unescape('Album: %s - %s - %s' % (artist_name
, album_name
, released
))
169 return 'Unsupported type.'
172 mo
= re
.match(r
'P(\d+D)?T(\d+H)?(\d+M)?(\d+S)?', s
)
173 pcs
= [x
for x
in mo
.groups() if x
]
174 return ''.join(pcs
).lower()
176 mo
= re
.match(r
'(\d{4})-(\d{2})-(\d{2})T(\d{2}):(\d{2}):(\d{2})\.(\d+)Z', s
)
177 return datetime
.datetime(*(int(x
) for x
in mo
.groups())).strftime(f
)
181 return '%.1ft' % (n
/10**12)
183 return '%.1fb' % (n
/10**9)
185 return '%.1fm' % (n
/10**6)
187 return '%.1fk' % (n
/10**3)
192 url_data
= urlparse
.urlparse(url
)
193 query
= urlparse
.parse_qs(url_data
.query
)
194 video
= query
["v"][0]
195 api_url
= 'https://www.googleapis.com/youtube/v3/videos?part=snippet,contentDetails,statistics&id=%s&key=%s' % (video
, lib
.parent
.cfg
.get('urls', 'api_key'))
197 respdata
= urllib2
.urlopen(api_url
).read()
198 v
= json
.loads(respdata
)
201 return unescape(lib
.parent
.cfg
.get('urls', 'yt_format', "\002%(author)s\002: \037%(title)s\037 [%(duration)s, uploaded %(uploaded)s, %(views)s v/%(likes)s l/%(dislikes)s d]") % {
202 'title': v
['snippet']['title'],
203 'author': v
['snippet']['channelTitle'],
204 'duration': _yt_duration(v
['contentDetails']['duration']),
205 'uploaded': _yt_date(v
['snippet']['publishedAt'], lib
.parent
.cfg
.get('urls', 'yt_date_format', '%b %d %Y')),
206 'views': _yt_round(v
['statistics']['viewCount']),
207 'likes': _yt_round(v
['statistics']['likeCount']),
208 'dislikes': _yt_round(v
['statistics']['dislikeCount']),
210 except urllib2
.HTTPError
as e
:
211 if e
.getcode() == 403:
212 return 'API limit exceeded'
217 except Exception as e
:
221 url
= 'https://api.twitch.tv/helix/streams?user_login=%s' % uri
.split('/')[0]
222 opener
= urllib2
.build_opener()
223 opener
.addheaders
= [('Client-ID', lib
.parent
.cfg
.get('urls', 'twitch_api_key'))]
224 respdata
= opener
.open(url
).read()
225 twitch
= json
.loads(respdata
)['data']
227 # TODO: add current game.
228 return unescape('\037%s\037 is %s (%s)' % (twitch
[0]['user_name'], twitch
[0]['type'], twitch
[0]['title']))
230 return 'Channel offline.'
232 def _humanize_bytes(b
):
235 table
= " kMGTPEZYRQ"
242 return "%.2f%siB" % (b
, table
[i
])
244 def _do_request(url
, try_aia
=False):
246 Return value is a tuple consisting of:
247 - the HTTPResponse object, or a string on error. Empty string -> no response.
248 - and a flag indicating whether AIA was used
250 request
= urllib2
.Request(url
, headers
={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36', 'Sec-Ch-Ua': '"Chromium";v="116", "Not)A;Brand";v="24", "Google Chrome";v="116"', 'Sec-Ch-Ua-Mobile': '?0', 'Sec-Ch-Ua-Platform': '"Linux"', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'same-origin', 'Sec-Fetch-User': '?1', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'Accept-Language': 'en-US,en;q=0.9', 'Cache-Control': 'no-cache', 'Pragma': 'no-cache', 'Upgrade-Insecure-Requests': '1'}
)
252 opener
= urllib2
.build_opener(urllib2
.HTTPSHandler(context
=aia_session
.ssl_context_from_url(url
)), SmartRedirectHandler())
254 opener
= urllib2
.build_opener(SmartRedirectHandler())
256 # Send request and handle errors
258 response
= opener
.open(request
, timeout
=2)
259 except http
.client
.InvalidURL
as e
: # why does a method under urllib.request raise an exception under http.client???
261 except urllib2
.HTTPError
as e
:
262 return 'Request error: %s %s' % (e
.code
, e
.reason
), False
263 except urllib2
.URLError
as e
:
264 if "certificate verify failed: unable to get local issuer certificate" in str(e
.reason
):
265 if aia
: # Retry with AIA enabled, if module is present
266 return _do_request(url
, True)
268 lib
.parent
.log('urls', '?', 'If the site is not serving the certificate chain, installing the aia library might make this request work: pip install aia')
269 return 'Request error: site may have broken TLS configuration (%s)' % (e
.reason
), False
271 return 'Request error: %s' % (e
.reason
), False
272 except TimeoutError
as e
:
273 return 'Request error: request timed out', False
274 except Exception as e
:
275 return 'Unknown error: %s %r' % (type(e
).__name
__, e
.args
), False
277 return response
, try_aia
282 for _
, group
in other_regexes
:
287 response
, used_aia
= _do_request(url
)
288 if isinstance(response
, stringbase
):
291 # Try to add type and length headers to reply
292 c_type
= response
.getheader('Content-Type', '').split(';', 1)[0]
293 c_len
= response
.getheader('Content-Length')
295 output
.append("[%s] " % (c_type
))
297 output
.append("[no type] ")
298 if c_type
!= "text/html": # else length will be provided by HTML code below
299 if c_len
is not None:
300 output
.append("[%s] " % (_humanize_bytes(c_len
)))
302 output
.append("[no length] ")
305 output
.append("[AIA] ")
307 # Try to add title if HTML
308 if c_type
== 'text/html':
310 responsebody
= response
.read(1024*1024)
311 except Exception as e
:
312 output
.append('Error reading response body: %s %r' % (type(e
).__name
__, e
.args
))
314 if c_len
is not None and len(responsebody
) != int(c_len
):
315 output
.append("[actual %s; Content-Length %s] " % (_humanize_bytes(len(responsebody
)), _humanize_bytes(c_len
)))
317 output
.append("[%s] " % (_humanize_bytes(len(responsebody
))))
319 soup
= BeautifulSoup(responsebody
)
321 output
.append('Title: ' + unescape('%s' % (soup
.find('title').string
.strip())))
323 output
.append('No title')
324 except Exception as e
:
325 output
.append('Title error: %s %r ' % (type(e
).__name
__, e
.args
))
327 return ''.join(output
)
330 re
.compile(r
'https?://(?:[^/\s.]+\.)+[^/\s.]+(?:/\S+)?'),
333 (lambda x
: '', (re
.compile(r
"""https?://(?:www\.)?(?:twitter|x)\.com/""", re
.I
),)), # skip twitter
334 (lambda x
: '', (re
.compile(r
"""https?://(?:www\.)?reddit\.com/""", re
.I
),)), # skip new-reddit
335 (lambda x
: '', (re
.compile(r
"""https?://jfr\.im/git/""", re
.I
),)), # skip my gitweb
337 regexes
= other_regexes
+ (