]> jfr.im git - irc/quakenet/qwebirc.git/blame - qwebirc/engines/ajaxengine.py
npe fix
[irc/quakenet/qwebirc.git] / qwebirc / engines / ajaxengine.py
CommitLineData
99844c15 1from twisted.web import resource, server, static, error as http_error
9e769c12 2from twisted.names import client
265f5ce3 3from twisted.internet import reactor, error
ace37679 4from authgateengine import login_optional, getSessionData
becfa850 5import md5, sys, os, time, config, qwebirc.config_options as config_options, traceback, socket
85f01e3f
CP
6import qwebirc.ircclient as ircclient
7from adminengine import AdminEngineAction
8from qwebirc.util import HitCounter
28c4ad01 9import qwebirc.dns as qdns
becfa850 10import qwebirc.util.qjson as json
c60795d6
CP
11import urlparse
12
13TRANSPORTS = ["longpoll"]
14
15try:
16 import autobahn.websocket
17 import autobahn.resource
18 has_websocket = True
19 TRANSPORTS.append("websocket")
20except ImportError:
21 has_websocket = False
22
23BAD_SESSION_MESSAGE = "Invalid session, this most likely means the server has restarted; close this dialog and then try refreshing the page."
3029b80a 24MAX_SEQNO = 9223372036854775807 # 2**63 - 1... yeah it doesn't wrap
9e769c12
CP
25Sessions = {}
26
27def get_session_id():
4e4bbf26 28 return md5.md5(os.urandom(16)).hexdigest()
8dc46dfa
CP
29
30class BufferOverflowException(Exception):
31 pass
32
f59585a7
CP
33class AJAXException(Exception):
34 pass
35
4094890f
CP
36class IDGenerationException(Exception):
37 pass
38
c60795d6 39class LineTooLongException(Exception):
99844c15 40 pass
bdd008f9 41
c60795d6 42EMPTY_JSON_LIST = json.dumps([])
9e769c12 43
8dc46dfa
CP
44def cleanupSession(id):
45 try:
46 del Sessions[id]
47 except KeyError:
48 pass
49
9e769c12
CP
50class IRCSession:
51 def __init__(self, id):
52 self.id = id
53 self.subscriptions = []
54 self.buffer = []
3029b80a 55 self.old_buffer = None
8932790b 56 self.buflen = 0
9e769c12
CP
57 self.throttle = 0
58 self.schedule = None
8dc46dfa
CP
59 self.closed = False
60 self.cleanupschedule = None
3029b80a
CP
61 self.pubSeqNo = -1
62 self.subSeqNo = 0
8dc46dfa 63
3029b80a 64 def subscribe(self, channel, seqNo=None):
0df6faa6 65 if len(self.subscriptions) >= config.MAXSUBSCRIPTIONS:
4e221566 66 self.subscriptions.pop(0).close()
0df6faa6 67
c0312865 68 if seqNo is not None and seqNo < self.subSeqNo:
3029b80a
CP
69 if self.old_buffer is None or seqNo != self.old_buffer[0]:
70 channel.write(json.dumps([False, "Unable to reconnect -- sequence number too old."]), seqNo + 1)
71 return
72
73 if not channel.write(self.old_buffer[1], self.old_buffer[0] + 1):
74 return
75
9e769c12 76 self.subscriptions.append(channel)
3029b80a 77 self.flush(seqNo)
c60795d6
CP
78
79 def unsubscribe(self, channel):
80 try:
81 self.subscriptions.remove(channel)
82 except ValueError:
83 pass
84
265f5ce3
CP
85 def timeout(self, channel):
86 if self.schedule:
87 return
1bccb3e6
CP
88
89 self.unsubscribe(channel)
f35e22e3 90 channel.write(EMPTY_JSON_LIST, self.subSeqNo)
1bccb3e6 91
9e769c12
CP
92 def flush(self, scheduled=False):
93 if scheduled:
94 self.schedule = None
95
96 if not self.buffer or not self.subscriptions:
97 return
98
99 t = time.time()
100
101 if t < self.throttle:
102 if not self.schedule:
103 self.schedule = reactor.callLater(self.throttle - t, self.flush, True)
104 return
105 else:
106 # process the rest of the packet
107 if not scheduled:
108 if not self.schedule:
109 self.schedule = reactor.callLater(0, self.flush, True)
110 return
1bccb3e6 111
9e769c12
CP
112 self.throttle = t + config.UPDATE_FREQ
113
becfa850 114 encdata = json.dumps(self.buffer)
3029b80a
CP
115 self.old_buffer = (self.subSeqNo, encdata)
116 self.subSeqNo+=1
9e769c12 117 self.buffer = []
8932790b
CP
118 self.buflen = 0
119
1bccb3e6
CP
120 subs = self.subscriptions
121 self.subscriptions = newsubs = []
3029b80a 122
1bccb3e6 123 for x in subs:
3029b80a 124 if x.write(encdata, self.subSeqNo):
9e769c12
CP
125 newsubs.append(x)
126
1bccb3e6 127 if self.closed and not newsubs:
8dc46dfa
CP
128 cleanupSession(self.id)
129
9e769c12 130 def event(self, data):
8932790b
CP
131 newbuflen = self.buflen + len(data)
132 if newbuflen > config.MAXBUFLEN:
8dc46dfa 133 self.buffer = []
99844c15 134 self.client.error("Buffer overflow.")
8dc46dfa
CP
135 return
136
9e769c12 137 self.buffer.append(data)
8932790b 138 self.buflen = newbuflen
9e769c12
CP
139 self.flush()
140
3029b80a 141 def push(self, data, seq_no=None):
c60795d6
CP
142 if self.closed:
143 return
144
145 if len(data) > config.MAXLINELEN:
146 raise LineTooLongException
147
3029b80a
CP
148 if seq_no is not None:
149 if seq_no <= self.pubSeqNo:
150 return
151 self.pubSeqNo = seq_no
c60795d6 152 self.client.write(data)
8dc46dfa
CP
153
154 def disconnect(self):
155 # keep the session hanging around for a few seconds so the
156 # client has a chance to see what the issue was
157 self.closed = True
158
159 reactor.callLater(5, cleanupSession, self.id)
160
28c4ad01
CP
161# DANGER! Breach of encapsulation!
162def connect_notice(line):
163 return "c", "NOTICE", "", ("AUTH", "*** (qwebirc) %s" % line)
164
c60795d6 165class RequestChannel(object):
9e769c12
CP
166 def __init__(self, request):
167 self.request = request
c60795d6 168
3029b80a
CP
169 def write(self, data, seqNo):
170 self.request.setHeader("n", str(seqNo))
9e769c12
CP
171 self.request.write(data)
172 self.request.finish()
173 return False
174
4e221566
CP
175 def close(self):
176 self.request.finish()
177
9e769c12
CP
178class AJAXEngine(resource.Resource):
179 isLeaf = True
180
181 def __init__(self, prefix):
182 self.prefix = prefix
85f01e3f
CP
183 self.__connect_hit = HitCounter()
184 self.__total_hit = HitCounter()
185
57ea572e 186 def render_POST(self, request):
9e769c12 187 path = request.path[len(self.prefix):]
f59585a7
CP
188 if path[0] == "/":
189 handler = self.COMMANDS.get(path[1:])
190 if handler is not None:
c60795d6
CP
191 try:
192 return handler(self, request)
193 except AJAXException, e:
194 return json.dumps((False, e[0]))
195
196 return "404" ## TODO: tidy up
f59585a7 197
f59585a7 198 def newConnection(self, request):
f065bc69
CP
199 ticket = login_optional(request)
200
23f85e9b 201 ip = request.getClientIP()
9e769c12 202
c70a7ff6 203 nick = request.args.get("nick")
f59585a7 204 if not nick:
99844c15 205 raise AJAXException, "Nickname not supplied."
c70a7ff6 206 nick = ircclient.irc_decode(nick[0])
57ea572e 207
6ce70043 208 password = request.args.get("password")
2f74dea9
CP
209 if password is not None:
210 password = ircclient.irc_decode(password[0])
211
c60795d6 212 for i in range(10):
f59585a7
CP
213 id = get_session_id()
214 if not Sessions.get(id):
215 break
216 else:
217 raise IDGenerationException()
9e769c12 218
f59585a7 219 session = IRCSession(id)
9e769c12 220
ace37679
CP
221 qticket = getSessionData(request).get("qticket")
222 if qticket is None:
223 perform = None
224 else:
348574ee
CP
225 service_mask = config.AUTH_SERVICE
226 msg_mask = service_mask.split("!")[0] + "@" + service_mask.split("@", 1)[1]
227 perform = ["PRIVMSG %s :TICKETAUTH %s" % (msg_mask, qticket)]
ace37679 228
b5c84380 229 ident, realname = config.IDENT, config.REALNAME
930be88a 230 if ident is config_options.IDENT_HEX or ident is None: # latter is legacy
b5c84380 231 ident = socket.inet_aton(ip).encode("hex")
930be88a
CP
232 elif ident is config_options.IDENT_NICKNAME:
233 ident = nick
b5c84380 234
85f01e3f 235 self.__connect_hit()
28c4ad01
CP
236
237 def proceed(hostname):
2f74dea9
CP
238 kwargs = dict(nick=nick, ident=ident, ip=ip, realname=realname, perform=perform, hostname=hostname)
239 if password is not None:
240 kwargs["password"] = password
241
242 client = ircclient.createIRC(session, **kwargs)
28c4ad01
CP
243 session.client = client
244
930be88a
CP
245 if not hasattr(config, "WEBIRC_MODE") or config.WEBIRC_MODE == "hmac":
246 proceed(None)
247 elif config.WEBIRC_MODE != "hmac":
28c4ad01
CP
248 notice = lambda x: session.event(connect_notice(x))
249 notice("Looking up your hostname...")
250 def callback(hostname):
251 notice("Found your hostname.")
252 proceed(hostname)
253 def errback(failure):
254 notice("Couldn't look up your hostname!")
255 proceed(ip)
256 qdns.lookupAndVerifyPTR(ip, timeout=[config.DNS_TIMEOUT]).addCallbacks(callback, errback)
28c4ad01 257
f59585a7
CP
258 Sessions[id] = session
259
c60795d6 260 return json.dumps((True, id, TRANSPORTS))
3029b80a 261
f59585a7 262 def getSession(self, request):
71afd444
CP
263 bad_session_message = "Invalid session, this most likely means the server has restarted; close this dialog and then try refreshing the page."
264
f59585a7
CP
265 sessionid = request.args.get("s")
266 if sessionid is None:
71afd444 267 raise AJAXException, bad_session_message
9e769c12 268
f59585a7
CP
269 session = Sessions.get(sessionid[0])
270 if not session:
71afd444 271 raise AJAXException, bad_session_message
f59585a7 272 return session
8dc46dfa 273
f59585a7 274 def subscribe(self, request):
1bccb3e6 275 request.channel.setTimeout(None)
c60795d6
CP
276
277 channel = RequestChannel(request)
278 session = self.getSession(request)
279 notifier = request.notifyFinish()
3029b80a
CP
280
281 seq_no = request.args.get("n")
282 try:
283 if seq_no is not None:
284 seq_no = int(seq_no[0])
285 if seq_no < 0 or seq_no > MAX_SEQNO:
286 raise ValueError
287 except ValueError:
288 raise AJAXEngine, "Bad sequence number"
289
290 session.subscribe(channel, seq_no)
c60795d6
CP
291
292 timeout_entry = reactor.callLater(config.HTTP_AJAX_REQUEST_TIMEOUT, session.timeout, channel)
293 def cancel_timeout(result):
c60795d6
CP
294 try:
295 timeout_entry.cancel()
296 except error.AlreadyCalled:
297 pass
1bccb3e6 298 session.unsubscribe(channel)
c60795d6
CP
299 notifier.addCallbacks(cancel_timeout, cancel_timeout)
300 return server.NOT_DONE_YET
9e769c12 301
f59585a7
CP
302 def push(self, request):
303 command = request.args.get("c")
304 if command is None:
99844c15 305 raise AJAXException, "No command specified."
85f01e3f 306 self.__total_hit()
3029b80a
CP
307
308 seq_no = request.args.get("n")
309 try:
310 if seq_no is not None:
311 seq_no = int(seq_no[0])
312 if seq_no < 0 or seq_no > MAX_SEQNO:
313 raise ValueError
314 except ValueError:
06dbebbc 315 raise AJAXEngine("Bad sequence number %r" % seq_no)
3029b80a 316
f59585a7 317 session = self.getSession(request)
f59585a7 318 try:
3029b80a 319 session.push(ircclient.irc_decode(command[0]), seq_no)
f59585a7
CP
320 except AttributeError: # occurs when we haven't noticed an error
321 session.disconnect()
99844c15 322 raise AJAXException, "Connection closed by server; try reconnecting by reloading the page."
f59585a7
CP
323 except Exception, e: # catch all
324 session.disconnect()
325 traceback.print_exc(file=sys.stderr)
71afd444 326 raise AJAXException, "Unknown error."
f59585a7 327
c60795d6 328 return json.dumps((True, True))
f59585a7 329
85f01e3f
CP
330 def closeById(self, k):
331 s = Sessions.get(k)
332 if s is None:
333 return
334 s.client.client.error("Closed by admin interface")
335
336 @property
337 def adminEngine(self):
338 return {
339 "Sessions": [(str(v.client.client), AdminEngineAction("close", self.closeById, k)) for k, v in Sessions.iteritems() if not v.closed],
340 "Connections": [(self.__connect_hit,)],
341 "Total hits": [(self.__total_hit,)],
342 }
343
f59585a7 344 COMMANDS = dict(p=push, n=newConnection, s=subscribe)
b5c84380 345
c60795d6
CP
346if has_websocket:
347 class WebSocketChannel(object):
348 def __init__(self, channel):
349 self.channel = channel
350
3029b80a
CP
351 def write(self, data, seqNo):
352 self.channel.send("c", "%d,%s" % (seqNo, data))
c60795d6
CP
353 return True
354
355 def close(self):
356 self.channel.close()
357
358 class WebSocketEngineProtocol(autobahn.websocket.WebSocketServerProtocol):
359 AWAITING_AUTH, AUTHED = 0, 1
360
361 def __init__(self, *args, **kwargs):
362 self.__state = self.AWAITING_AUTH
363 self.__session = None
364 self.__channel = None
365 self.__timeout = None
366
367 def onOpen(self):
368 self.__timeout = reactor.callLater(5, self.close, "Authentication timeout")
369
370 def onClose(self, wasClean, code, reason):
371 self.__cancelTimeout()
372 if self.__session:
373 self.__session.unsubscribe(self.__channel)
374 self.__session = None
375
376 def onMessage(self, msg, binary):
377 # we don't bother checking the Origin header, as if you can auth then you've been able to pass the browser's
378 # normal origin handling (POSTed the new connection request and managed to get the session id)
379 state = self.__state
380 message_type, message = msg[:1], msg[1:]
381 if state == self.AWAITING_AUTH:
382 if message_type == "s": # subscribe
3029b80a
CP
383 tokens = message.split(",", 2)
384 if len(tokens) != 2:
385 self.close("Bad tokens")
386 return
387
388 seq_no, message = tokens[0], tokens[1]
389 try:
390 seq_no = int(seq_no)
391 if seq_no < 0 or seq_no > MAX_SEQNO:
392 raise ValueError
393 except ValueError:
394 self.close("Bad value")
395
c60795d6
CP
396 session = Sessions.get(message)
397 if not session:
398 self.close(BAD_SESSION_MESSAGE)
399 return
400
401 self.__cancelTimeout()
402 self.__session = session
403 self.send("s", "True")
404 self.__state = self.AUTHED
405 self.__channel = WebSocketChannel(self)
3029b80a 406 session.subscribe(self.__channel, seq_no)
c60795d6
CP
407 return
408 elif state == self.AUTHED:
409 if message_type == "p": # push
3029b80a
CP
410 tokens = message.split(",", 2)
411 if len(tokens) != 2:
412 self.close("Bad tokens")
413 return
414
415 seq_no, message = tokens[0], tokens[1]
416 try:
417 seq_no = int(seq_no)
418 if seq_no < 0 or seq_no > MAX_SEQNO:
419 raise ValueError
420 except ValueError:
421 self.close("Bad value")
c60795d6
CP
422 self.__session.push(ircclient.irc_decode(message))
423 return
424
425 self.close("Bad message type")
426
427 def __cancelTimeout(self):
428 if self.__timeout is not None:
1bccb3e6
CP
429 try:
430 self.__timeout.cancel()
431 except error.AlreadyCalled:
432 pass
c60795d6
CP
433 self.__timeout = None
434
435 def close(self, reason=None):
436 self.__cancelTimeout()
437 if reason:
438 self.sendClose(4999, reason)
439 else:
440 self.sendClose(4998)
441
442 if self.__session:
443 self.__session.unsubscribe(self.__channel)
444 self.__session = None
445
446 def send(self, message_type, message):
447 self.sendMessage(message_type + message)
448
449 class WebSocketResource(autobahn.resource.WebSocketResource):
450 def render(self, request):
1bccb3e6 451 request.channel.setTimeout(None)
c60795d6
CP
452 return autobahn.resource.WebSocketResource.render(self, request)
453
454 def WebSocketEngine(path=None):
455 parsed = urlparse.urlparse(config.BASE_URL)
456 port = parsed.port
457 if port is None:
458 if parsed.scheme == "http":
459 port = 80
460 elif parsed.scheme == "https":
461 port = 443
462 else:
463 raise Exception("Unable to determine port from BASE_URL: " + config.BASE_URL)
464
465 factory = autobahn.websocket.WebSocketServerFactory("ws://localhost:%d" % port)
466 factory.protocol = WebSocketEngineProtocol
467 factory.setProtocolOptions(maxMessagePayloadSize=512, maxFramePayloadSize=512, tcpNoDelay=False)
468 resource = WebSocketResource(factory)
469 return resource