]> jfr.im git - irc/quakenet/qwebirc.git/blob - qwebirc/engines/ajaxengine.py
Merge default into quakenet.
[irc/quakenet/qwebirc.git] / qwebirc / engines / ajaxengine.py
1 from twisted.web import resource, server, static, error as http_error
2 from twisted.names import client
3 from twisted.internet import reactor, error
4 from authgateengine import login_optional, getSessionData
5 import md5, sys, os, time, config, qwebirc.config_options as config_options, traceback, socket
6 import qwebirc.ircclient as ircclient
7 from adminengine import AdminEngineAction
8 from qwebirc.util import HitCounter
9 import qwebirc.dns as qdns
10 import qwebirc.util.qjson as json
11 import urlparse
12
13 TRANSPORTS = ["longpoll"]
14
15 try:
16 import autobahn
17 x = autobahn.version.split(".")
18 if len(x) != 3:
19 raise ImportError("Unknown version: %s", autobahn.vesrion)
20 if (int(x[1]) < 8) or (int(x[1]) == 8 and int(x[2]) < 14):
21 raise ImportError()
22
23 import autobahn.twisted.websocket
24 import autobahn.twisted.resource
25 has_websocket = True
26 TRANSPORTS.append("websocket")
27 except ImportError:
28 has_websocket = False
29
30 BAD_SESSION_MESSAGE = "Invalid session, this most likely means the server has restarted; close this dialog and then try refreshing the page."
31 MAX_SEQNO = 9223372036854775807 # 2**63 - 1... yeah it doesn't wrap
32 Sessions = {}
33
34 def get_session_id():
35 return md5.md5(os.urandom(16)).hexdigest()
36
37 class BufferOverflowException(Exception):
38 pass
39
40 class AJAXException(Exception):
41 pass
42
43 class IDGenerationException(Exception):
44 pass
45
46 class LineTooLongException(Exception):
47 pass
48
49 EMPTY_JSON_LIST = json.dumps([])
50
51 def cleanupSession(id):
52 try:
53 del Sessions[id]
54 except KeyError:
55 pass
56
57 class IRCSession:
58 def __init__(self, id):
59 self.id = id
60 self.subscriptions = []
61 self.buffer = []
62 self.old_buffer = None
63 self.buflen = 0
64 self.throttle = 0
65 self.schedule = None
66 self.closed = False
67 self.cleanupschedule = None
68 self.pubSeqNo = -1
69 self.subSeqNo = 0
70
71 def subscribe(self, channel, seqNo=None):
72 if len(self.subscriptions) >= config.MAXSUBSCRIPTIONS:
73 self.subscriptions.pop(0).close()
74
75 if seqNo is not None and seqNo < self.subSeqNo:
76 if self.old_buffer is None or seqNo != self.old_buffer[0]:
77 channel.write(json.dumps([False, "Unable to reconnect -- sequence number too old."]), seqNo + 1)
78 return
79
80 if not channel.write(self.old_buffer[1], self.old_buffer[0] + 1):
81 return
82
83 self.subscriptions.append(channel)
84 self.flush(seqNo)
85
86 def unsubscribe(self, channel):
87 try:
88 self.subscriptions.remove(channel)
89 except ValueError:
90 pass
91
92 def timeout(self, channel):
93 if self.schedule:
94 return
95
96 self.unsubscribe(channel)
97 channel.write(EMPTY_JSON_LIST, self.subSeqNo)
98
99 def flush(self, scheduled=False):
100 if scheduled:
101 self.schedule = None
102
103 if not self.buffer or not self.subscriptions:
104 return
105
106 t = time.time()
107
108 if t < self.throttle:
109 if not self.schedule:
110 self.schedule = reactor.callLater(self.throttle - t, self.flush, True)
111 return
112 else:
113 # process the rest of the packet
114 if not scheduled:
115 if not self.schedule:
116 self.schedule = reactor.callLater(0, self.flush, True)
117 return
118
119 self.throttle = t + config.UPDATE_FREQ
120
121 encdata = json.dumps(self.buffer)
122 self.old_buffer = (self.subSeqNo, encdata)
123 self.subSeqNo+=1
124 self.buffer = []
125 self.buflen = 0
126
127 subs = self.subscriptions
128 self.subscriptions = newsubs = []
129
130 for x in subs:
131 if x.write(encdata, self.subSeqNo):
132 newsubs.append(x)
133
134 if self.closed and not newsubs:
135 cleanupSession(self.id)
136
137 def event(self, data):
138 newbuflen = self.buflen + len(data)
139 if newbuflen > config.MAXBUFLEN:
140 self.buffer = []
141 self.client.error("Buffer overflow.")
142 return
143
144 self.buffer.append(data)
145 self.buflen = newbuflen
146 self.flush()
147
148 def push(self, data, seq_no=None):
149 if self.closed:
150 return
151
152 if len(data) > config.MAXLINELEN:
153 raise LineTooLongException
154
155 if seq_no is not None:
156 if seq_no <= self.pubSeqNo:
157 return
158 self.pubSeqNo = seq_no
159 self.client.write(data)
160
161 def disconnect(self):
162 # keep the session hanging around for a few seconds so the
163 # client has a chance to see what the issue was
164 self.closed = True
165
166 reactor.callLater(5, cleanupSession, self.id)
167
168 # DANGER! Breach of encapsulation!
169 def connect_notice(line):
170 return "c", "NOTICE", "", ("AUTH", "*** (qwebirc) %s" % line)
171
172 class RequestChannel(object):
173 def __init__(self, request):
174 self.request = request
175
176 def write(self, data, seqNo):
177 self.request.setHeader("n", str(seqNo))
178 self.request.write(data)
179 self.request.finish()
180 return False
181
182 def close(self):
183 self.request.finish()
184
185 class AJAXEngine(resource.Resource):
186 isLeaf = True
187
188 def __init__(self, prefix):
189 self.prefix = prefix
190 self.__connect_hit = HitCounter()
191 self.__total_hit = HitCounter()
192
193 def render_POST(self, request):
194 path = request.path[len(self.prefix):]
195 if path[0] == "/":
196 handler = self.COMMANDS.get(path[1:])
197 if handler is not None:
198 try:
199 return handler(self, request)
200 except AJAXException, e:
201 return json.dumps((False, e[0]))
202
203 return "404" ## TODO: tidy up
204
205 def newConnection(self, request):
206 ticket = login_optional(request)
207
208 ip = request.getClientIP()
209
210 nick = request.args.get("nick")
211 if not nick:
212 raise AJAXException, "Nickname not supplied."
213 nick = ircclient.irc_decode(nick[0])
214
215 password = request.args.get("password")
216 if password is not None:
217 password = ircclient.irc_decode(password[0])
218
219 for i in range(10):
220 id = get_session_id()
221 if not Sessions.get(id):
222 break
223 else:
224 raise IDGenerationException()
225
226 session = IRCSession(id)
227
228 qticket = getSessionData(request).get("qticket")
229 if qticket is None:
230 perform = None
231 else:
232 service_mask = config.AUTH_SERVICE
233 msg_mask = service_mask.split("!")[0] + "@" + service_mask.split("@", 1)[1]
234 perform = ["PRIVMSG %s :TICKETAUTH %s" % (msg_mask, qticket)]
235
236 ident, realname = config.IDENT, config.REALNAME
237 if ident is config_options.IDENT_HEX or ident is None: # latter is legacy
238 ident = socket.inet_aton(ip).encode("hex")
239 elif ident is config_options.IDENT_NICKNAME:
240 ident = nick
241
242 self.__connect_hit()
243
244 def proceed(hostname):
245 kwargs = dict(nick=nick, ident=ident, ip=ip, realname=realname, perform=perform, hostname=hostname)
246 if password is not None:
247 kwargs["password"] = password
248
249 client = ircclient.createIRC(session, **kwargs)
250 session.client = client
251
252 if not hasattr(config, "WEBIRC_MODE") or config.WEBIRC_MODE == "hmac":
253 proceed(None)
254 elif config.WEBIRC_MODE != "hmac":
255 notice = lambda x: session.event(connect_notice(x))
256 notice("Looking up your hostname...")
257 def callback(hostname):
258 notice("Found your hostname.")
259 proceed(hostname)
260 def errback(failure):
261 notice("Couldn't look up your hostname!")
262 proceed(ip)
263 qdns.lookupAndVerifyPTR(ip, timeout=[config.DNS_TIMEOUT]).addCallbacks(callback, errback)
264
265 Sessions[id] = session
266
267 return json.dumps((True, id, TRANSPORTS))
268
269 def getSession(self, request):
270 bad_session_message = "Invalid session, this most likely means the server has restarted; close this dialog and then try refreshing the page."
271
272 sessionid = request.args.get("s")
273 if sessionid is None:
274 raise AJAXException, bad_session_message
275
276 session = Sessions.get(sessionid[0])
277 if not session:
278 raise AJAXException, bad_session_message
279 return session
280
281 def subscribe(self, request):
282 request.channel.setTimeout(None)
283
284 channel = RequestChannel(request)
285 session = self.getSession(request)
286 notifier = request.notifyFinish()
287
288 seq_no = request.args.get("n")
289 try:
290 if seq_no is not None:
291 seq_no = int(seq_no[0])
292 if seq_no < 0 or seq_no > MAX_SEQNO:
293 raise ValueError
294 except ValueError:
295 raise AJAXEngine, "Bad sequence number"
296
297 session.subscribe(channel, seq_no)
298
299 timeout_entry = reactor.callLater(config.HTTP_AJAX_REQUEST_TIMEOUT, session.timeout, channel)
300 def cancel_timeout(result):
301 try:
302 timeout_entry.cancel()
303 except error.AlreadyCalled:
304 pass
305 session.unsubscribe(channel)
306 notifier.addCallbacks(cancel_timeout, cancel_timeout)
307 return server.NOT_DONE_YET
308
309 def push(self, request):
310 command = request.args.get("c")
311 if command is None:
312 raise AJAXException, "No command specified."
313 self.__total_hit()
314
315 seq_no = request.args.get("n")
316 try:
317 if seq_no is not None:
318 seq_no = int(seq_no[0])
319 if seq_no < 0 or seq_no > MAX_SEQNO:
320 raise ValueError
321 except ValueError:
322 raise AJAXEngine("Bad sequence number %r" % seq_no)
323
324 session = self.getSession(request)
325 try:
326 session.push(ircclient.irc_decode(command[0]), seq_no)
327 except AttributeError: # occurs when we haven't noticed an error
328 session.disconnect()
329 raise AJAXException, "Connection closed by server; try reconnecting by reloading the page."
330 except Exception, e: # catch all
331 session.disconnect()
332 traceback.print_exc(file=sys.stderr)
333 raise AJAXException, "Unknown error."
334
335 return json.dumps((True, True))
336
337 def closeById(self, k):
338 s = Sessions.get(k)
339 if s is None:
340 return
341 s.client.client.error("Closed by admin interface")
342
343 @property
344 def adminEngine(self):
345 return {
346 "Sessions": [(str(v.client.client), AdminEngineAction("close", self.closeById, k)) for k, v in Sessions.iteritems() if not v.closed],
347 "Connections": [(self.__connect_hit,)],
348 "Total hits": [(self.__total_hit,)],
349 }
350
351 COMMANDS = dict(p=push, n=newConnection, s=subscribe)
352
353 if has_websocket:
354 class WebSocketChannel(object):
355 def __init__(self, channel):
356 self.channel = channel
357
358 def write(self, data, seqNo):
359 self.channel.send("c", "%d,%s" % (seqNo, data))
360 return True
361
362 def close(self):
363 self.channel.close()
364
365 class WebSocketEngineProtocol(autobahn.twisted.websocket.WebSocketServerProtocol):
366 AWAITING_AUTH, AUTHED = 0, 1
367
368 def __init__(self, *args, **kwargs):
369 self.__state = self.AWAITING_AUTH
370 self.__session = None
371 self.__channel = None
372 self.__timeout = None
373
374 def onOpen(self):
375 self.__timeout = reactor.callLater(5, self.close, "Authentication timeout")
376
377 def onClose(self, wasClean, code, reason):
378 self.__cancelTimeout()
379 if self.__session:
380 self.__session.unsubscribe(self.__channel)
381 self.__session = None
382
383 def onMessage(self, msg, binary):
384 # we don't bother checking the Origin header, as if you can auth then you've been able to pass the browser's
385 # normal origin handling (POSTed the new connection request and managed to get the session id)
386 state = self.__state
387 message_type, message = msg[:1], msg[1:]
388 if state == self.AWAITING_AUTH:
389 if message_type == "s": # subscribe
390 tokens = message.split(",", 1)
391 if len(tokens) != 2:
392 self.close("Bad tokens")
393 return
394
395 seq_no, message = tokens[0], tokens[1]
396 try:
397 seq_no = int(seq_no)
398 if seq_no < 0 or seq_no > MAX_SEQNO:
399 raise ValueError
400 except ValueError:
401 self.close("Bad value")
402
403 session = Sessions.get(message)
404 if not session:
405 self.close(BAD_SESSION_MESSAGE)
406 return
407
408 self.__cancelTimeout()
409 self.__session = session
410 self.send("s", "True")
411 self.__state = self.AUTHED
412 self.__channel = WebSocketChannel(self)
413 session.subscribe(self.__channel, seq_no)
414 return
415 elif state == self.AUTHED:
416 if message_type == "p": # push
417 tokens = message.split(",", 1)
418 if len(tokens) != 2:
419 self.close("Bad tokens")
420 return
421
422 seq_no, message = tokens[0], tokens[1]
423 try:
424 seq_no = int(seq_no)
425 if seq_no < 0 or seq_no > MAX_SEQNO:
426 raise ValueError
427 except ValueError:
428 self.close("Bad value")
429 self.__session.push(ircclient.irc_decode(message))
430 return
431
432 self.close("Bad message type")
433
434 def __cancelTimeout(self):
435 if self.__timeout is not None:
436 try:
437 self.__timeout.cancel()
438 except error.AlreadyCalled:
439 pass
440 self.__timeout = None
441
442 def close(self, reason=None):
443 self.__cancelTimeout()
444 if reason:
445 self.sendClose(4999, reason)
446 else:
447 self.sendClose(4998)
448
449 if self.__session:
450 self.__session.unsubscribe(self.__channel)
451 self.__session = None
452
453 def send(self, message_type, message):
454 self.sendMessage(message_type + message)
455
456 class WebSocketResource(autobahn.twisted.resource.WebSocketResource):
457 def render(self, request):
458 request.channel.setTimeout(None)
459 return autobahn.twisted.resource.WebSocketResource.render(self, request)
460
461 def WebSocketEngine(path=None):
462 factory = autobahn.twisted.websocket.WebSocketServerFactory("ws://localhost")
463 factory.externalPort = None
464 factory.protocol = WebSocketEngineProtocol
465 factory.setProtocolOptions(maxMessagePayloadSize=512, maxFramePayloadSize=512, tcpNoDelay=False)
466 resource = WebSocketResource(factory)
467 return resource