]> jfr.im git - irc/quakenet/qwebirc.git/blame - qwebirc/engines/ajaxengine.py
Merge.
[irc/quakenet/qwebirc.git] / qwebirc / engines / ajaxengine.py
CommitLineData
99844c15 1from twisted.web import resource, server, static, error as http_error
9e769c12 2from twisted.names import client
265f5ce3 3from twisted.internet import reactor, error
ace37679 4from authgateengine import login_optional, getSessionData
becfa850 5import md5, sys, os, time, config, qwebirc.config_options as config_options, traceback, socket
85f01e3f
CP
6import qwebirc.ircclient as ircclient
7from adminengine import AdminEngineAction
8from qwebirc.util import HitCounter
28c4ad01 9import qwebirc.dns as qdns
becfa850 10import qwebirc.util.qjson as json
c60795d6
CP
11import urlparse
12
13TRANSPORTS = ["longpoll"]
14
15try:
5a0fb86d
CP
16 import autobahn
17 x = autobahn.version.split(".")
18 if len(x) != 3:
19 raise ImportError("Unknown version: %s", autobahn.vesrion)
20 if (int(x[1]) < 8) or (int(x[1]) == 8 and int(x[2]) < 14):
21 raise ImportError()
22
23 import autobahn.twisted.websocket
24 import autobahn.twisted.resource
c60795d6
CP
25 has_websocket = True
26 TRANSPORTS.append("websocket")
27except ImportError:
28 has_websocket = False
29
30BAD_SESSION_MESSAGE = "Invalid session, this most likely means the server has restarted; close this dialog and then try refreshing the page."
3029b80a 31MAX_SEQNO = 9223372036854775807 # 2**63 - 1... yeah it doesn't wrap
9e769c12
CP
32Sessions = {}
33
34def get_session_id():
4e4bbf26 35 return md5.md5(os.urandom(16)).hexdigest()
8dc46dfa
CP
36
37class BufferOverflowException(Exception):
38 pass
39
f59585a7
CP
40class AJAXException(Exception):
41 pass
42
4094890f
CP
43class IDGenerationException(Exception):
44 pass
45
c60795d6 46class LineTooLongException(Exception):
99844c15 47 pass
bdd008f9 48
c60795d6 49EMPTY_JSON_LIST = json.dumps([])
9e769c12 50
8dc46dfa
CP
51def cleanupSession(id):
52 try:
53 del Sessions[id]
54 except KeyError:
55 pass
56
9e769c12
CP
57class IRCSession:
58 def __init__(self, id):
59 self.id = id
60 self.subscriptions = []
61 self.buffer = []
3029b80a 62 self.old_buffer = None
8932790b 63 self.buflen = 0
9e769c12
CP
64 self.throttle = 0
65 self.schedule = None
8dc46dfa
CP
66 self.closed = False
67 self.cleanupschedule = None
3029b80a
CP
68 self.pubSeqNo = -1
69 self.subSeqNo = 0
8dc46dfa 70
3029b80a 71 def subscribe(self, channel, seqNo=None):
0df6faa6 72 if len(self.subscriptions) >= config.MAXSUBSCRIPTIONS:
4e221566 73 self.subscriptions.pop(0).close()
0df6faa6 74
c0312865 75 if seqNo is not None and seqNo < self.subSeqNo:
3029b80a
CP
76 if self.old_buffer is None or seqNo != self.old_buffer[0]:
77 channel.write(json.dumps([False, "Unable to reconnect -- sequence number too old."]), seqNo + 1)
78 return
79
80 if not channel.write(self.old_buffer[1], self.old_buffer[0] + 1):
81 return
82
9e769c12 83 self.subscriptions.append(channel)
3029b80a 84 self.flush(seqNo)
c60795d6
CP
85
86 def unsubscribe(self, channel):
87 try:
88 self.subscriptions.remove(channel)
89 except ValueError:
90 pass
91
265f5ce3
CP
92 def timeout(self, channel):
93 if self.schedule:
94 return
1bccb3e6
CP
95
96 self.unsubscribe(channel)
f35e22e3 97 channel.write(EMPTY_JSON_LIST, self.subSeqNo)
1bccb3e6 98
9e769c12
CP
99 def flush(self, scheduled=False):
100 if scheduled:
101 self.schedule = None
102
103 if not self.buffer or not self.subscriptions:
104 return
105
106 t = time.time()
107
108 if t < self.throttle:
109 if not self.schedule:
110 self.schedule = reactor.callLater(self.throttle - t, self.flush, True)
111 return
112 else:
113 # process the rest of the packet
114 if not scheduled:
115 if not self.schedule:
116 self.schedule = reactor.callLater(0, self.flush, True)
117 return
1bccb3e6 118
9e769c12
CP
119 self.throttle = t + config.UPDATE_FREQ
120
becfa850 121 encdata = json.dumps(self.buffer)
3029b80a
CP
122 self.old_buffer = (self.subSeqNo, encdata)
123 self.subSeqNo+=1
9e769c12 124 self.buffer = []
8932790b
CP
125 self.buflen = 0
126
1bccb3e6
CP
127 subs = self.subscriptions
128 self.subscriptions = newsubs = []
3029b80a 129
1bccb3e6 130 for x in subs:
3029b80a 131 if x.write(encdata, self.subSeqNo):
9e769c12
CP
132 newsubs.append(x)
133
1bccb3e6 134 if self.closed and not newsubs:
8dc46dfa
CP
135 cleanupSession(self.id)
136
9e769c12 137 def event(self, data):
8932790b
CP
138 newbuflen = self.buflen + len(data)
139 if newbuflen > config.MAXBUFLEN:
8dc46dfa 140 self.buffer = []
99844c15 141 self.client.error("Buffer overflow.")
8dc46dfa
CP
142 return
143
9e769c12 144 self.buffer.append(data)
8932790b 145 self.buflen = newbuflen
9e769c12
CP
146 self.flush()
147
3029b80a 148 def push(self, data, seq_no=None):
c60795d6
CP
149 if self.closed:
150 return
151
152 if len(data) > config.MAXLINELEN:
153 raise LineTooLongException
154
3029b80a
CP
155 if seq_no is not None:
156 if seq_no <= self.pubSeqNo:
157 return
158 self.pubSeqNo = seq_no
c60795d6 159 self.client.write(data)
8dc46dfa
CP
160
161 def disconnect(self):
162 # keep the session hanging around for a few seconds so the
163 # client has a chance to see what the issue was
164 self.closed = True
165
166 reactor.callLater(5, cleanupSession, self.id)
167
28c4ad01
CP
168# DANGER! Breach of encapsulation!
169def connect_notice(line):
170 return "c", "NOTICE", "", ("AUTH", "*** (qwebirc) %s" % line)
171
c60795d6 172class RequestChannel(object):
9e769c12
CP
173 def __init__(self, request):
174 self.request = request
c60795d6 175
3029b80a
CP
176 def write(self, data, seqNo):
177 self.request.setHeader("n", str(seqNo))
9e769c12
CP
178 self.request.write(data)
179 self.request.finish()
180 return False
181
4e221566
CP
182 def close(self):
183 self.request.finish()
184
9e769c12
CP
185class AJAXEngine(resource.Resource):
186 isLeaf = True
187
188 def __init__(self, prefix):
189 self.prefix = prefix
85f01e3f
CP
190 self.__connect_hit = HitCounter()
191 self.__total_hit = HitCounter()
192
57ea572e 193 def render_POST(self, request):
9e769c12 194 path = request.path[len(self.prefix):]
f59585a7
CP
195 if path[0] == "/":
196 handler = self.COMMANDS.get(path[1:])
197 if handler is not None:
c60795d6
CP
198 try:
199 return handler(self, request)
200 except AJAXException, e:
201 return json.dumps((False, e[0]))
202
203 return "404" ## TODO: tidy up
f59585a7 204
f59585a7 205 def newConnection(self, request):
f065bc69
CP
206 ticket = login_optional(request)
207
23f85e9b 208 ip = request.getClientIP()
9e769c12 209
c70a7ff6 210 nick = request.args.get("nick")
f59585a7 211 if not nick:
99844c15 212 raise AJAXException, "Nickname not supplied."
c70a7ff6 213 nick = ircclient.irc_decode(nick[0])
57ea572e 214
6ce70043 215 password = request.args.get("password")
2f74dea9
CP
216 if password is not None:
217 password = ircclient.irc_decode(password[0])
218
c60795d6 219 for i in range(10):
f59585a7
CP
220 id = get_session_id()
221 if not Sessions.get(id):
222 break
223 else:
224 raise IDGenerationException()
9e769c12 225
f59585a7 226 session = IRCSession(id)
9e769c12 227
ace37679
CP
228 qticket = getSessionData(request).get("qticket")
229 if qticket is None:
230 perform = None
231 else:
348574ee
CP
232 service_mask = config.AUTH_SERVICE
233 msg_mask = service_mask.split("!")[0] + "@" + service_mask.split("@", 1)[1]
234 perform = ["PRIVMSG %s :TICKETAUTH %s" % (msg_mask, qticket)]
ace37679 235
b5c84380 236 ident, realname = config.IDENT, config.REALNAME
930be88a 237 if ident is config_options.IDENT_HEX or ident is None: # latter is legacy
b5c84380 238 ident = socket.inet_aton(ip).encode("hex")
930be88a
CP
239 elif ident is config_options.IDENT_NICKNAME:
240 ident = nick
b5c84380 241
85f01e3f 242 self.__connect_hit()
28c4ad01
CP
243
244 def proceed(hostname):
2f74dea9
CP
245 kwargs = dict(nick=nick, ident=ident, ip=ip, realname=realname, perform=perform, hostname=hostname)
246 if password is not None:
247 kwargs["password"] = password
248
249 client = ircclient.createIRC(session, **kwargs)
28c4ad01
CP
250 session.client = client
251
930be88a
CP
252 if not hasattr(config, "WEBIRC_MODE") or config.WEBIRC_MODE == "hmac":
253 proceed(None)
254 elif config.WEBIRC_MODE != "hmac":
28c4ad01
CP
255 notice = lambda x: session.event(connect_notice(x))
256 notice("Looking up your hostname...")
257 def callback(hostname):
258 notice("Found your hostname.")
259 proceed(hostname)
260 def errback(failure):
261 notice("Couldn't look up your hostname!")
262 proceed(ip)
263 qdns.lookupAndVerifyPTR(ip, timeout=[config.DNS_TIMEOUT]).addCallbacks(callback, errback)
28c4ad01 264
f59585a7
CP
265 Sessions[id] = session
266
c60795d6 267 return json.dumps((True, id, TRANSPORTS))
3029b80a 268
f59585a7 269 def getSession(self, request):
71afd444
CP
270 bad_session_message = "Invalid session, this most likely means the server has restarted; close this dialog and then try refreshing the page."
271
f59585a7
CP
272 sessionid = request.args.get("s")
273 if sessionid is None:
71afd444 274 raise AJAXException, bad_session_message
9e769c12 275
f59585a7
CP
276 session = Sessions.get(sessionid[0])
277 if not session:
71afd444 278 raise AJAXException, bad_session_message
f59585a7 279 return session
8dc46dfa 280
f59585a7 281 def subscribe(self, request):
1bccb3e6 282 request.channel.setTimeout(None)
c60795d6
CP
283
284 channel = RequestChannel(request)
285 session = self.getSession(request)
286 notifier = request.notifyFinish()
3029b80a
CP
287
288 seq_no = request.args.get("n")
289 try:
290 if seq_no is not None:
291 seq_no = int(seq_no[0])
292 if seq_no < 0 or seq_no > MAX_SEQNO:
293 raise ValueError
294 except ValueError:
295 raise AJAXEngine, "Bad sequence number"
296
297 session.subscribe(channel, seq_no)
c60795d6
CP
298
299 timeout_entry = reactor.callLater(config.HTTP_AJAX_REQUEST_TIMEOUT, session.timeout, channel)
300 def cancel_timeout(result):
c60795d6
CP
301 try:
302 timeout_entry.cancel()
303 except error.AlreadyCalled:
304 pass
1bccb3e6 305 session.unsubscribe(channel)
c60795d6
CP
306 notifier.addCallbacks(cancel_timeout, cancel_timeout)
307 return server.NOT_DONE_YET
9e769c12 308
f59585a7
CP
309 def push(self, request):
310 command = request.args.get("c")
311 if command is None:
99844c15 312 raise AJAXException, "No command specified."
85f01e3f 313 self.__total_hit()
3029b80a
CP
314
315 seq_no = request.args.get("n")
316 try:
317 if seq_no is not None:
318 seq_no = int(seq_no[0])
319 if seq_no < 0 or seq_no > MAX_SEQNO:
320 raise ValueError
321 except ValueError:
06dbebbc 322 raise AJAXEngine("Bad sequence number %r" % seq_no)
3029b80a 323
f59585a7 324 session = self.getSession(request)
f59585a7 325 try:
3029b80a 326 session.push(ircclient.irc_decode(command[0]), seq_no)
f59585a7
CP
327 except AttributeError: # occurs when we haven't noticed an error
328 session.disconnect()
99844c15 329 raise AJAXException, "Connection closed by server; try reconnecting by reloading the page."
f59585a7
CP
330 except Exception, e: # catch all
331 session.disconnect()
332 traceback.print_exc(file=sys.stderr)
71afd444 333 raise AJAXException, "Unknown error."
f59585a7 334
c60795d6 335 return json.dumps((True, True))
f59585a7 336
85f01e3f
CP
337 def closeById(self, k):
338 s = Sessions.get(k)
339 if s is None:
340 return
341 s.client.client.error("Closed by admin interface")
342
343 @property
344 def adminEngine(self):
345 return {
346 "Sessions": [(str(v.client.client), AdminEngineAction("close", self.closeById, k)) for k, v in Sessions.iteritems() if not v.closed],
347 "Connections": [(self.__connect_hit,)],
348 "Total hits": [(self.__total_hit,)],
349 }
350
f59585a7 351 COMMANDS = dict(p=push, n=newConnection, s=subscribe)
b5c84380 352
c60795d6
CP
353if has_websocket:
354 class WebSocketChannel(object):
355 def __init__(self, channel):
356 self.channel = channel
357
3029b80a
CP
358 def write(self, data, seqNo):
359 self.channel.send("c", "%d,%s" % (seqNo, data))
c60795d6
CP
360 return True
361
362 def close(self):
363 self.channel.close()
364
5a0fb86d 365 class WebSocketEngineProtocol(autobahn.twisted.websocket.WebSocketServerProtocol):
c60795d6
CP
366 AWAITING_AUTH, AUTHED = 0, 1
367
368 def __init__(self, *args, **kwargs):
369 self.__state = self.AWAITING_AUTH
370 self.__session = None
371 self.__channel = None
372 self.__timeout = None
373
374 def onOpen(self):
375 self.__timeout = reactor.callLater(5, self.close, "Authentication timeout")
376
377 def onClose(self, wasClean, code, reason):
378 self.__cancelTimeout()
379 if self.__session:
380 self.__session.unsubscribe(self.__channel)
381 self.__session = None
382
383 def onMessage(self, msg, binary):
384 # we don't bother checking the Origin header, as if you can auth then you've been able to pass the browser's
385 # normal origin handling (POSTed the new connection request and managed to get the session id)
386 state = self.__state
387 message_type, message = msg[:1], msg[1:]
388 if state == self.AWAITING_AUTH:
389 if message_type == "s": # subscribe
4e587841 390 tokens = message.split(",", 1)
3029b80a
CP
391 if len(tokens) != 2:
392 self.close("Bad tokens")
393 return
394
395 seq_no, message = tokens[0], tokens[1]
396 try:
397 seq_no = int(seq_no)
398 if seq_no < 0 or seq_no > MAX_SEQNO:
399 raise ValueError
400 except ValueError:
401 self.close("Bad value")
402
c60795d6
CP
403 session = Sessions.get(message)
404 if not session:
405 self.close(BAD_SESSION_MESSAGE)
406 return
407
408 self.__cancelTimeout()
409 self.__session = session
410 self.send("s", "True")
411 self.__state = self.AUTHED
412 self.__channel = WebSocketChannel(self)
3029b80a 413 session.subscribe(self.__channel, seq_no)
c60795d6
CP
414 return
415 elif state == self.AUTHED:
416 if message_type == "p": # push
4e587841 417 tokens = message.split(",", 1)
3029b80a
CP
418 if len(tokens) != 2:
419 self.close("Bad tokens")
420 return
421
422 seq_no, message = tokens[0], tokens[1]
423 try:
424 seq_no = int(seq_no)
425 if seq_no < 0 or seq_no > MAX_SEQNO:
426 raise ValueError
427 except ValueError:
428 self.close("Bad value")
c60795d6
CP
429 self.__session.push(ircclient.irc_decode(message))
430 return
431
432 self.close("Bad message type")
433
434 def __cancelTimeout(self):
435 if self.__timeout is not None:
1bccb3e6
CP
436 try:
437 self.__timeout.cancel()
438 except error.AlreadyCalled:
439 pass
c60795d6
CP
440 self.__timeout = None
441
442 def close(self, reason=None):
443 self.__cancelTimeout()
444 if reason:
445 self.sendClose(4999, reason)
446 else:
447 self.sendClose(4998)
448
449 if self.__session:
450 self.__session.unsubscribe(self.__channel)
451 self.__session = None
452
453 def send(self, message_type, message):
454 self.sendMessage(message_type + message)
455
5a0fb86d 456 class WebSocketResource(autobahn.twisted.resource.WebSocketResource):
c60795d6 457 def render(self, request):
1bccb3e6 458 request.channel.setTimeout(None)
5a0fb86d 459 return autobahn.twisted.resource.WebSocketResource.render(self, request)
c60795d6
CP
460
461 def WebSocketEngine(path=None):
5a0fb86d
CP
462 factory = autobahn.twisted.websocket.WebSocketServerFactory("ws://localhost")
463 factory.externalPort = None
c60795d6
CP
464 factory.protocol = WebSocketEngineProtocol
465 factory.setProtocolOptions(maxMessagePayloadSize=512, maxFramePayloadSize=512, tcpNoDelay=False)
466 resource = WebSocketResource(factory)
467 return resource