]> jfr.im git - irc/quakenet/qwebirc.git/blame - qwebirc/engines/ajaxengine.py
add dynamic configuration support
[irc/quakenet/qwebirc.git] / qwebirc / engines / ajaxengine.py
CommitLineData
99844c15 1from twisted.web import resource, server, static, error as http_error
9e769c12 2from twisted.names import client
265f5ce3 3from twisted.internet import reactor, error
ace37679 4from authgateengine import login_optional, getSessionData
becfa850 5import md5, sys, os, time, config, qwebirc.config_options as config_options, traceback, socket
85f01e3f
CP
6import qwebirc.ircclient as ircclient
7from adminengine import AdminEngineAction
8from qwebirc.util import HitCounter
28c4ad01 9import qwebirc.dns as qdns
becfa850 10import qwebirc.util.qjson as json
c60795d6 11import urlparse
48f0fc43 12import qwebirc.util.autobahn_check as autobahn_check
c60795d6
CP
13
14TRANSPORTS = ["longpoll"]
15
48f0fc43
CP
16has_websocket = False
17autobahn_status = autobahn_check.check()
18if autobahn_status == True:
5a0fb86d 19 import autobahn
5a0fb86d
CP
20 import autobahn.twisted.websocket
21 import autobahn.twisted.resource
c60795d6
CP
22 has_websocket = True
23 TRANSPORTS.append("websocket")
48f0fc43
CP
24elif autobahn_status == False:
25 # they've been warned already
26 pass
27else:
28 print >>sys.stderr, "WARNING:"
29 print >>sys.stderr, " %s" % autobahn_status
30 print >>sys.stderr, " as a result websocket support is disabled."
31 print >>sys.stderr, " upgrade your version of autobahn from http://autobahn.ws/python/getstarted/"
c60795d6
CP
32
33BAD_SESSION_MESSAGE = "Invalid session, this most likely means the server has restarted; close this dialog and then try refreshing the page."
3029b80a 34MAX_SEQNO = 9223372036854775807 # 2**63 - 1... yeah it doesn't wrap
9e769c12
CP
35Sessions = {}
36
37def get_session_id():
4e4bbf26 38 return md5.md5(os.urandom(16)).hexdigest()
8dc46dfa
CP
39
40class BufferOverflowException(Exception):
41 pass
42
f59585a7
CP
43class AJAXException(Exception):
44 pass
45
4094890f
CP
46class IDGenerationException(Exception):
47 pass
48
c60795d6 49class LineTooLongException(Exception):
99844c15 50 pass
bdd008f9 51
c60795d6 52EMPTY_JSON_LIST = json.dumps([])
9e769c12 53
8dc46dfa
CP
54def cleanupSession(id):
55 try:
56 del Sessions[id]
57 except KeyError:
58 pass
59
9e769c12
CP
60class IRCSession:
61 def __init__(self, id):
62 self.id = id
63 self.subscriptions = []
64 self.buffer = []
3029b80a 65 self.old_buffer = None
8932790b 66 self.buflen = 0
9e769c12
CP
67 self.throttle = 0
68 self.schedule = None
8dc46dfa
CP
69 self.closed = False
70 self.cleanupschedule = None
3029b80a
CP
71 self.pubSeqNo = -1
72 self.subSeqNo = 0
8dc46dfa 73
3029b80a 74 def subscribe(self, channel, seqNo=None):
0df6faa6 75 if len(self.subscriptions) >= config.MAXSUBSCRIPTIONS:
4e221566 76 self.subscriptions.pop(0).close()
0df6faa6 77
c0312865 78 if seqNo is not None and seqNo < self.subSeqNo:
3029b80a
CP
79 if self.old_buffer is None or seqNo != self.old_buffer[0]:
80 channel.write(json.dumps([False, "Unable to reconnect -- sequence number too old."]), seqNo + 1)
81 return
82
83 if not channel.write(self.old_buffer[1], self.old_buffer[0] + 1):
84 return
85
9e769c12 86 self.subscriptions.append(channel)
3029b80a 87 self.flush(seqNo)
c60795d6
CP
88
89 def unsubscribe(self, channel):
90 try:
91 self.subscriptions.remove(channel)
92 except ValueError:
93 pass
94
265f5ce3
CP
95 def timeout(self, channel):
96 if self.schedule:
97 return
1bccb3e6
CP
98
99 self.unsubscribe(channel)
f35e22e3 100 channel.write(EMPTY_JSON_LIST, self.subSeqNo)
1bccb3e6 101
9e769c12
CP
102 def flush(self, scheduled=False):
103 if scheduled:
104 self.schedule = None
105
106 if not self.buffer or not self.subscriptions:
107 return
108
109 t = time.time()
110
111 if t < self.throttle:
112 if not self.schedule:
113 self.schedule = reactor.callLater(self.throttle - t, self.flush, True)
114 return
115 else:
116 # process the rest of the packet
117 if not scheduled:
118 if not self.schedule:
119 self.schedule = reactor.callLater(0, self.flush, True)
120 return
1bccb3e6 121
9e769c12
CP
122 self.throttle = t + config.UPDATE_FREQ
123
becfa850 124 encdata = json.dumps(self.buffer)
3029b80a
CP
125 self.old_buffer = (self.subSeqNo, encdata)
126 self.subSeqNo+=1
9e769c12 127 self.buffer = []
8932790b
CP
128 self.buflen = 0
129
1bccb3e6
CP
130 subs = self.subscriptions
131 self.subscriptions = newsubs = []
3029b80a 132
1bccb3e6 133 for x in subs:
3029b80a 134 if x.write(encdata, self.subSeqNo):
9e769c12
CP
135 newsubs.append(x)
136
1bccb3e6 137 if self.closed and not newsubs:
8dc46dfa
CP
138 cleanupSession(self.id)
139
9e769c12 140 def event(self, data):
8932790b
CP
141 newbuflen = self.buflen + len(data)
142 if newbuflen > config.MAXBUFLEN:
8dc46dfa 143 self.buffer = []
99844c15 144 self.client.error("Buffer overflow.")
8dc46dfa
CP
145 return
146
9e769c12 147 self.buffer.append(data)
8932790b 148 self.buflen = newbuflen
9e769c12
CP
149 self.flush()
150
3029b80a 151 def push(self, data, seq_no=None):
c60795d6
CP
152 if self.closed:
153 return
154
155 if len(data) > config.MAXLINELEN:
156 raise LineTooLongException
157
3029b80a
CP
158 if seq_no is not None:
159 if seq_no <= self.pubSeqNo:
160 return
161 self.pubSeqNo = seq_no
c60795d6 162 self.client.write(data)
8dc46dfa
CP
163
164 def disconnect(self):
165 # keep the session hanging around for a few seconds so the
166 # client has a chance to see what the issue was
167 self.closed = True
168
169 reactor.callLater(5, cleanupSession, self.id)
170
28c4ad01
CP
171# DANGER! Breach of encapsulation!
172def connect_notice(line):
173 return "c", "NOTICE", "", ("AUTH", "*** (qwebirc) %s" % line)
174
c60795d6 175class RequestChannel(object):
9e769c12
CP
176 def __init__(self, request):
177 self.request = request
c60795d6 178
3029b80a
CP
179 def write(self, data, seqNo):
180 self.request.setHeader("n", str(seqNo))
9e769c12
CP
181 self.request.write(data)
182 self.request.finish()
183 return False
184
4e221566
CP
185 def close(self):
186 self.request.finish()
187
9e769c12
CP
188class AJAXEngine(resource.Resource):
189 isLeaf = True
190
191 def __init__(self, prefix):
192 self.prefix = prefix
85f01e3f
CP
193 self.__connect_hit = HitCounter()
194 self.__total_hit = HitCounter()
195
57ea572e 196 def render_POST(self, request):
9e769c12 197 path = request.path[len(self.prefix):]
f59585a7
CP
198 if path[0] == "/":
199 handler = self.COMMANDS.get(path[1:])
200 if handler is not None:
c60795d6
CP
201 try:
202 return handler(self, request)
203 except AJAXException, e:
204 return json.dumps((False, e[0]))
205
206 return "404" ## TODO: tidy up
f59585a7 207
f59585a7 208 def newConnection(self, request):
f065bc69
CP
209 ticket = login_optional(request)
210
23f85e9b 211 ip = request.getClientIP()
9e769c12 212
c70a7ff6 213 nick = request.args.get("nick")
f59585a7 214 if not nick:
99844c15 215 raise AJAXException, "Nickname not supplied."
c70a7ff6 216 nick = ircclient.irc_decode(nick[0])
57ea572e 217
6ce70043 218 password = request.args.get("password")
2f74dea9
CP
219 if password is not None:
220 password = ircclient.irc_decode(password[0])
221
c60795d6 222 for i in range(10):
f59585a7
CP
223 id = get_session_id()
224 if not Sessions.get(id):
225 break
226 else:
227 raise IDGenerationException()
9e769c12 228
f59585a7 229 session = IRCSession(id)
9e769c12 230
ace37679
CP
231 qticket = getSessionData(request).get("qticket")
232 if qticket is None:
233 perform = None
234 else:
348574ee
CP
235 service_mask = config.AUTH_SERVICE
236 msg_mask = service_mask.split("!")[0] + "@" + service_mask.split("@", 1)[1]
237 perform = ["PRIVMSG %s :TICKETAUTH %s" % (msg_mask, qticket)]
ace37679 238
b5c84380 239 ident, realname = config.IDENT, config.REALNAME
930be88a 240 if ident is config_options.IDENT_HEX or ident is None: # latter is legacy
b5c84380 241 ident = socket.inet_aton(ip).encode("hex")
930be88a
CP
242 elif ident is config_options.IDENT_NICKNAME:
243 ident = nick
b5c84380 244
85f01e3f 245 self.__connect_hit()
28c4ad01
CP
246
247 def proceed(hostname):
2f74dea9
CP
248 kwargs = dict(nick=nick, ident=ident, ip=ip, realname=realname, perform=perform, hostname=hostname)
249 if password is not None:
250 kwargs["password"] = password
251
252 client = ircclient.createIRC(session, **kwargs)
28c4ad01
CP
253 session.client = client
254
930be88a
CP
255 if not hasattr(config, "WEBIRC_MODE") or config.WEBIRC_MODE == "hmac":
256 proceed(None)
257 elif config.WEBIRC_MODE != "hmac":
28c4ad01
CP
258 notice = lambda x: session.event(connect_notice(x))
259 notice("Looking up your hostname...")
260 def callback(hostname):
261 notice("Found your hostname.")
262 proceed(hostname)
263 def errback(failure):
264 notice("Couldn't look up your hostname!")
265 proceed(ip)
266 qdns.lookupAndVerifyPTR(ip, timeout=[config.DNS_TIMEOUT]).addCallbacks(callback, errback)
28c4ad01 267
f59585a7
CP
268 Sessions[id] = session
269
c60795d6 270 return json.dumps((True, id, TRANSPORTS))
3029b80a 271
f59585a7 272 def getSession(self, request):
71afd444
CP
273 bad_session_message = "Invalid session, this most likely means the server has restarted; close this dialog and then try refreshing the page."
274
f59585a7
CP
275 sessionid = request.args.get("s")
276 if sessionid is None:
71afd444 277 raise AJAXException, bad_session_message
9e769c12 278
f59585a7
CP
279 session = Sessions.get(sessionid[0])
280 if not session:
71afd444 281 raise AJAXException, bad_session_message
f59585a7 282 return session
8dc46dfa 283
f59585a7 284 def subscribe(self, request):
1bccb3e6 285 request.channel.setTimeout(None)
c60795d6
CP
286
287 channel = RequestChannel(request)
288 session = self.getSession(request)
289 notifier = request.notifyFinish()
3029b80a
CP
290
291 seq_no = request.args.get("n")
292 try:
293 if seq_no is not None:
294 seq_no = int(seq_no[0])
295 if seq_no < 0 or seq_no > MAX_SEQNO:
296 raise ValueError
297 except ValueError:
298 raise AJAXEngine, "Bad sequence number"
299
300 session.subscribe(channel, seq_no)
c60795d6
CP
301
302 timeout_entry = reactor.callLater(config.HTTP_AJAX_REQUEST_TIMEOUT, session.timeout, channel)
303 def cancel_timeout(result):
c60795d6
CP
304 try:
305 timeout_entry.cancel()
306 except error.AlreadyCalled:
307 pass
1bccb3e6 308 session.unsubscribe(channel)
c60795d6
CP
309 notifier.addCallbacks(cancel_timeout, cancel_timeout)
310 return server.NOT_DONE_YET
9e769c12 311
f59585a7
CP
312 def push(self, request):
313 command = request.args.get("c")
314 if command is None:
99844c15 315 raise AJAXException, "No command specified."
85f01e3f 316 self.__total_hit()
3029b80a
CP
317
318 seq_no = request.args.get("n")
319 try:
320 if seq_no is not None:
321 seq_no = int(seq_no[0])
322 if seq_no < 0 or seq_no > MAX_SEQNO:
323 raise ValueError
324 except ValueError:
06dbebbc 325 raise AJAXEngine("Bad sequence number %r" % seq_no)
3029b80a 326
f59585a7 327 session = self.getSession(request)
f59585a7 328 try:
3029b80a 329 session.push(ircclient.irc_decode(command[0]), seq_no)
f59585a7
CP
330 except AttributeError: # occurs when we haven't noticed an error
331 session.disconnect()
99844c15 332 raise AJAXException, "Connection closed by server; try reconnecting by reloading the page."
f59585a7
CP
333 except Exception, e: # catch all
334 session.disconnect()
335 traceback.print_exc(file=sys.stderr)
71afd444 336 raise AJAXException, "Unknown error."
f59585a7 337
c60795d6 338 return json.dumps((True, True))
f59585a7 339
85f01e3f
CP
340 def closeById(self, k):
341 s = Sessions.get(k)
342 if s is None:
343 return
344 s.client.client.error("Closed by admin interface")
345
346 @property
347 def adminEngine(self):
348 return {
349 "Sessions": [(str(v.client.client), AdminEngineAction("close", self.closeById, k)) for k, v in Sessions.iteritems() if not v.closed],
350 "Connections": [(self.__connect_hit,)],
351 "Total hits": [(self.__total_hit,)],
352 }
353
f59585a7 354 COMMANDS = dict(p=push, n=newConnection, s=subscribe)
b5c84380 355
c60795d6
CP
356if has_websocket:
357 class WebSocketChannel(object):
358 def __init__(self, channel):
359 self.channel = channel
360
3029b80a
CP
361 def write(self, data, seqNo):
362 self.channel.send("c", "%d,%s" % (seqNo, data))
c60795d6
CP
363 return True
364
365 def close(self):
366 self.channel.close()
367
5a0fb86d 368 class WebSocketEngineProtocol(autobahn.twisted.websocket.WebSocketServerProtocol):
c60795d6
CP
369 AWAITING_AUTH, AUTHED = 0, 1
370
371 def __init__(self, *args, **kwargs):
48f0fc43 372 super(WebSocketEngineProtocol, self).__init__(*args, **kwargs)
c60795d6
CP
373 self.__state = self.AWAITING_AUTH
374 self.__session = None
375 self.__channel = None
376 self.__timeout = None
377
378 def onOpen(self):
379 self.__timeout = reactor.callLater(5, self.close, "Authentication timeout")
380
381 def onClose(self, wasClean, code, reason):
382 self.__cancelTimeout()
383 if self.__session:
384 self.__session.unsubscribe(self.__channel)
385 self.__session = None
386
48f0fc43 387 def onMessage(self, msg, isBinary):
c60795d6
CP
388 # we don't bother checking the Origin header, as if you can auth then you've been able to pass the browser's
389 # normal origin handling (POSTed the new connection request and managed to get the session id)
390 state = self.__state
391 message_type, message = msg[:1], msg[1:]
392 if state == self.AWAITING_AUTH:
393 if message_type == "s": # subscribe
4e587841 394 tokens = message.split(",", 1)
3029b80a
CP
395 if len(tokens) != 2:
396 self.close("Bad tokens")
397 return
398
399 seq_no, message = tokens[0], tokens[1]
400 try:
401 seq_no = int(seq_no)
402 if seq_no < 0 or seq_no > MAX_SEQNO:
403 raise ValueError
404 except ValueError:
405 self.close("Bad value")
406
c60795d6
CP
407 session = Sessions.get(message)
408 if not session:
409 self.close(BAD_SESSION_MESSAGE)
410 return
411
412 self.__cancelTimeout()
413 self.__session = session
414 self.send("s", "True")
415 self.__state = self.AUTHED
416 self.__channel = WebSocketChannel(self)
3029b80a 417 session.subscribe(self.__channel, seq_no)
c60795d6
CP
418 return
419 elif state == self.AUTHED:
420 if message_type == "p": # push
4e587841 421 tokens = message.split(",", 1)
3029b80a
CP
422 if len(tokens) != 2:
423 self.close("Bad tokens")
424 return
425
426 seq_no, message = tokens[0], tokens[1]
427 try:
428 seq_no = int(seq_no)
429 if seq_no < 0 or seq_no > MAX_SEQNO:
430 raise ValueError
431 except ValueError:
432 self.close("Bad value")
c60795d6
CP
433 self.__session.push(ircclient.irc_decode(message))
434 return
435
436 self.close("Bad message type")
437
438 def __cancelTimeout(self):
439 if self.__timeout is not None:
1bccb3e6
CP
440 try:
441 self.__timeout.cancel()
442 except error.AlreadyCalled:
443 pass
c60795d6
CP
444 self.__timeout = None
445
446 def close(self, reason=None):
447 self.__cancelTimeout()
448 if reason:
48f0fc43 449 self.sendClose(4999, unicode(reason))
c60795d6
CP
450 else:
451 self.sendClose(4998)
452
453 if self.__session:
454 self.__session.unsubscribe(self.__channel)
455 self.__session = None
456
457 def send(self, message_type, message):
458 self.sendMessage(message_type + message)
459
5a0fb86d 460 class WebSocketResource(autobahn.twisted.resource.WebSocketResource):
c60795d6 461 def render(self, request):
1bccb3e6 462 request.channel.setTimeout(None)
5a0fb86d 463 return autobahn.twisted.resource.WebSocketResource.render(self, request)
c60795d6
CP
464
465 def WebSocketEngine(path=None):
5a0fb86d
CP
466 factory = autobahn.twisted.websocket.WebSocketServerFactory("ws://localhost")
467 factory.externalPort = None
c60795d6
CP
468 factory.protocol = WebSocketEngineProtocol
469 factory.setProtocolOptions(maxMessagePayloadSize=512, maxFramePayloadSize=512, tcpNoDelay=False)
470 resource = WebSocketResource(factory)
471 return resource
48f0fc43 472