]> jfr.im git - irc/quakenet/qwebirc.git/blob - qwebirc/engines/ajaxengine.py
experimental websocket support
[irc/quakenet/qwebirc.git] / qwebirc / engines / ajaxengine.py
1 from twisted.web import resource, server, static, error as http_error
2 from twisted.names import client
3 from twisted.internet import reactor, error
4 from authgateengine import login_optional, getSessionData
5 import md5, sys, os, time, config, qwebirc.config_options as config_options, traceback, socket
6 import qwebirc.ircclient as ircclient
7 from adminengine import AdminEngineAction
8 from qwebirc.util import HitCounter
9 import qwebirc.dns as qdns
10 import qwebirc.util.qjson as json
11 import urlparse
12
13 TRANSPORTS = ["longpoll"]
14
15 try:
16 import autobahn.websocket
17 import autobahn.resource
18 has_websocket = True
19 TRANSPORTS.append("websocket")
20 except ImportError:
21 has_websocket = False
22
23 BAD_SESSION_MESSAGE = "Invalid session, this most likely means the server has restarted; close this dialog and then try refreshing the page."
24 Sessions = {}
25
26 def get_session_id():
27 return md5.md5(os.urandom(16)).hexdigest()
28
29 class BufferOverflowException(Exception):
30 pass
31
32 class AJAXException(Exception):
33 pass
34
35 class IDGenerationException(Exception):
36 pass
37
38 class LineTooLongException(Exception):
39 pass
40
41 EMPTY_JSON_LIST = json.dumps([])
42
43 def cleanupSession(id):
44 try:
45 del Sessions[id]
46 except KeyError:
47 pass
48
49 class IRCSession:
50 def __init__(self, id):
51 self.id = id
52 self.subscriptions = []
53 self.buffer = []
54 self.buflen = 0
55 self.throttle = 0
56 self.schedule = None
57 self.closed = False
58 self.cleanupschedule = None
59
60 def subscribe(self, channel):
61 if len(self.subscriptions) >= config.MAXSUBSCRIPTIONS:
62 self.subscriptions.pop(0).close()
63
64 self.subscriptions.append(channel)
65 self.flush()
66
67 def unsubscribe(self, channel):
68 try:
69 self.subscriptions.remove(channel)
70 except ValueError:
71 pass
72
73 def timeout(self, channel):
74 if self.schedule:
75 return
76
77 channel.write(EMPTY_JSON_LIST)
78 if channel in self.subscriptions:
79 self.subscriptions.remove(channel)
80
81 def flush(self, scheduled=False):
82 if scheduled:
83 self.schedule = None
84
85 if not self.buffer or not self.subscriptions:
86 return
87
88 t = time.time()
89
90 if t < self.throttle:
91 if not self.schedule:
92 self.schedule = reactor.callLater(self.throttle - t, self.flush, True)
93 return
94 else:
95 # process the rest of the packet
96 if not scheduled:
97 if not self.schedule:
98 self.schedule = reactor.callLater(0, self.flush, True)
99 return
100
101 self.throttle = t + config.UPDATE_FREQ
102
103 encdata = json.dumps(self.buffer)
104 self.buffer = []
105 self.buflen = 0
106
107 newsubs = []
108 for x in self.subscriptions:
109 if x.write(encdata):
110 newsubs.append(x)
111
112 self.subscriptions = newsubs
113 if self.closed and not self.subscriptions:
114 cleanupSession(self.id)
115
116 def event(self, data):
117 newbuflen = self.buflen + len(data)
118 if newbuflen > config.MAXBUFLEN:
119 self.buffer = []
120 self.client.error("Buffer overflow.")
121 return
122
123 self.buffer.append(data)
124 self.buflen = newbuflen
125 self.flush()
126
127 def push(self, data):
128 if self.closed:
129 return
130
131 if len(data) > config.MAXLINELEN:
132 raise LineTooLongException
133
134 self.client.write(data)
135
136 def disconnect(self):
137 # keep the session hanging around for a few seconds so the
138 # client has a chance to see what the issue was
139 self.closed = True
140
141 reactor.callLater(5, cleanupSession, self.id)
142
143 # DANGER! Breach of encapsulation!
144 def connect_notice(line):
145 return "c", "NOTICE", "", ("AUTH", "*** (qwebirc) %s" % line)
146
147 class RequestChannel(object):
148 def __init__(self, request):
149 self.request = request
150
151 def write(self, data):
152 self.request.write(data)
153 self.request.finish()
154 return False
155
156 def close(self):
157 self.request.finish()
158
159 class AJAXEngine(resource.Resource):
160 isLeaf = True
161
162 def __init__(self, prefix):
163 self.prefix = prefix
164 self.__connect_hit = HitCounter()
165 self.__total_hit = HitCounter()
166
167 def render_POST(self, request):
168 path = request.path[len(self.prefix):]
169 if path[0] == "/":
170 handler = self.COMMANDS.get(path[1:])
171 if handler is not None:
172 try:
173 return handler(self, request)
174 except AJAXException, e:
175 return json.dumps((False, e[0]))
176
177 return "404" ## TODO: tidy up
178
179 def newConnection(self, request):
180 ticket = login_optional(request)
181
182 ip = request.getClientIP()
183
184 nick = request.args.get("nick")
185 if not nick:
186 raise AJAXException, "Nickname not supplied."
187 nick = ircclient.irc_decode(nick[0])
188
189 password = request.args.get("password")
190 if password is not None:
191 password = ircclient.irc_decode(password[0])
192
193 for i in range(10):
194 id = get_session_id()
195 if not Sessions.get(id):
196 break
197 else:
198 raise IDGenerationException()
199
200 session = IRCSession(id)
201
202 qticket = getSessionData(request).get("qticket")
203 if qticket is None:
204 perform = None
205 else:
206 service_mask = config.AUTH_SERVICE
207 msg_mask = service_mask.split("!")[0] + "@" + service_mask.split("@", 1)[1]
208 perform = ["PRIVMSG %s :TICKETAUTH %s" % (msg_mask, qticket)]
209
210 ident, realname = config.IDENT, config.REALNAME
211 if ident is config_options.IDENT_HEX or ident is None: # latter is legacy
212 ident = socket.inet_aton(ip).encode("hex")
213 elif ident is config_options.IDENT_NICKNAME:
214 ident = nick
215
216 self.__connect_hit()
217
218 def proceed(hostname):
219 kwargs = dict(nick=nick, ident=ident, ip=ip, realname=realname, perform=perform, hostname=hostname)
220 if password is not None:
221 kwargs["password"] = password
222
223 client = ircclient.createIRC(session, **kwargs)
224 session.client = client
225
226 if not hasattr(config, "WEBIRC_MODE") or config.WEBIRC_MODE == "hmac":
227 proceed(None)
228 elif config.WEBIRC_MODE != "hmac":
229 notice = lambda x: session.event(connect_notice(x))
230 notice("Looking up your hostname...")
231 def callback(hostname):
232 notice("Found your hostname.")
233 proceed(hostname)
234 def errback(failure):
235 notice("Couldn't look up your hostname!")
236 proceed(ip)
237 qdns.lookupAndVerifyPTR(ip, timeout=[config.DNS_TIMEOUT]).addCallbacks(callback, errback)
238
239 Sessions[id] = session
240
241 return json.dumps((True, id, TRANSPORTS))
242
243 def getSession(self, request):
244 bad_session_message = "Invalid session, this most likely means the server has restarted; close this dialog and then try refreshing the page."
245
246 sessionid = request.args.get("s")
247 if sessionid is None:
248 raise AJAXException, bad_session_message
249
250 session = Sessions.get(sessionid[0])
251 if not session:
252 raise AJAXException, bad_session_message
253 return session
254
255 def subscribe(self, request):
256 request.channel.cancelTimeout()
257
258 channel = RequestChannel(request)
259 session = self.getSession(request)
260 notifier = request.notifyFinish()
261 session.subscribe(channel)
262
263 timeout_entry = reactor.callLater(config.HTTP_AJAX_REQUEST_TIMEOUT, session.timeout, channel)
264 def cancel_timeout(result):
265 session.unsubscribe(self)
266 try:
267 timeout_entry.cancel()
268 except error.AlreadyCalled:
269 pass
270 notifier.addCallbacks(cancel_timeout, cancel_timeout)
271 return server.NOT_DONE_YET
272
273 def push(self, request):
274 command = request.args.get("c")
275 if command is None:
276 raise AJAXException, "No command specified."
277 self.__total_hit()
278
279 session = self.getSession(request)
280 try:
281 session.push(ircclient.irc_decode(command[0]))
282 except AttributeError: # occurs when we haven't noticed an error
283 session.disconnect()
284 raise AJAXException, "Connection closed by server; try reconnecting by reloading the page."
285 except Exception, e: # catch all
286 session.disconnect()
287 traceback.print_exc(file=sys.stderr)
288 raise AJAXException, "Unknown error."
289
290 return json.dumps((True, True))
291
292 def closeById(self, k):
293 s = Sessions.get(k)
294 if s is None:
295 return
296 s.client.client.error("Closed by admin interface")
297
298 @property
299 def adminEngine(self):
300 return {
301 "Sessions": [(str(v.client.client), AdminEngineAction("close", self.closeById, k)) for k, v in Sessions.iteritems() if not v.closed],
302 "Connections": [(self.__connect_hit,)],
303 "Total hits": [(self.__total_hit,)],
304 }
305
306 COMMANDS = dict(p=push, n=newConnection, s=subscribe)
307
308 if has_websocket:
309 class WebSocketChannel(object):
310 def __init__(self, channel):
311 self.channel = channel
312
313 def write(self, data):
314 self.channel.send("c", data)
315 return True
316
317 def close(self):
318 self.channel.close()
319
320 class WebSocketEngineProtocol(autobahn.websocket.WebSocketServerProtocol):
321 AWAITING_AUTH, AUTHED = 0, 1
322
323 def __init__(self, *args, **kwargs):
324 self.__state = self.AWAITING_AUTH
325 self.__session = None
326 self.__channel = None
327 self.__timeout = None
328
329 def onOpen(self):
330 self.__timeout = reactor.callLater(5, self.close, "Authentication timeout")
331
332 def onClose(self, wasClean, code, reason):
333 self.__cancelTimeout()
334 if self.__session:
335 self.__session.unsubscribe(self.__channel)
336 self.__session = None
337
338 def onMessage(self, msg, binary):
339 # we don't bother checking the Origin header, as if you can auth then you've been able to pass the browser's
340 # normal origin handling (POSTed the new connection request and managed to get the session id)
341 state = self.__state
342 message_type, message = msg[:1], msg[1:]
343 if state == self.AWAITING_AUTH:
344 if message_type == "s": # subscribe
345 session = Sessions.get(message)
346 if not session:
347 self.close(BAD_SESSION_MESSAGE)
348 return
349
350 self.__cancelTimeout()
351 self.__session = session
352 self.send("s", "True")
353 self.__state = self.AUTHED
354 self.__channel = WebSocketChannel(self)
355 session.subscribe(self.__channel)
356 return
357 elif state == self.AUTHED:
358 if message_type == "p": # push
359 self.__session.push(ircclient.irc_decode(message))
360 return
361
362 self.close("Bad message type")
363
364 def __cancelTimeout(self):
365 if self.__timeout is not None:
366 self.__timeout.cancel()
367 self.__timeout = None
368
369 def close(self, reason=None):
370 self.__cancelTimeout()
371 if reason:
372 self.sendClose(4999, reason)
373 else:
374 self.sendClose(4998)
375
376 if self.__session:
377 self.__session.unsubscribe(self.__channel)
378 self.__session = None
379
380 def send(self, message_type, message):
381 self.sendMessage(message_type + message)
382
383 class WebSocketResource(autobahn.resource.WebSocketResource):
384 def render(self, request):
385 request.channel.cancelTimeout()
386 return autobahn.resource.WebSocketResource.render(self, request)
387
388 def WebSocketEngine(path=None):
389 parsed = urlparse.urlparse(config.BASE_URL)
390 port = parsed.port
391 if port is None:
392 if parsed.scheme == "http":
393 port = 80
394 elif parsed.scheme == "https":
395 port = 443
396 else:
397 raise Exception("Unable to determine port from BASE_URL: " + config.BASE_URL)
398
399 factory = autobahn.websocket.WebSocketServerFactory("ws://localhost:%d" % port)
400 factory.protocol = WebSocketEngineProtocol
401 factory.setProtocolOptions(maxMessagePayloadSize=512, maxFramePayloadSize=512, tcpNoDelay=False)
402 resource = WebSocketResource(factory)
403 return resource