]> jfr.im git - irc/quakenet/qwebirc.git/blob - qwebirc/engines/ajaxengine.py
add support for flash websockets (including flash policy server) -- wss untested
[irc/quakenet/qwebirc.git] / qwebirc / engines / ajaxengine.py
1 from twisted.web import resource, server, static, error as http_error
2 from twisted.names import client
3 from twisted.internet import reactor, error
4 from authgateengine import login_optional, getSessionData
5 import md5, sys, os, time, config, qwebirc.config_options as config_options, traceback, socket
6 import qwebirc.ircclient as ircclient
7 from adminengine import AdminEngineAction
8 from qwebirc.util import HitCounter
9 import qwebirc.dns as qdns
10 import qwebirc.util.qjson as json
11 import urlparse
12
13 TRANSPORTS = ["longpoll"]
14
15 try:
16 import autobahn.websocket
17 import autobahn.resource
18 has_websocket = True
19 TRANSPORTS.append("websocket")
20 except ImportError:
21 has_websocket = False
22
23 BAD_SESSION_MESSAGE = "Invalid session, this most likely means the server has restarted; close this dialog and then try refreshing the page."
24 Sessions = {}
25
26 def get_session_id():
27 return md5.md5(os.urandom(16)).hexdigest()
28
29 class BufferOverflowException(Exception):
30 pass
31
32 class AJAXException(Exception):
33 pass
34
35 class IDGenerationException(Exception):
36 pass
37
38 class LineTooLongException(Exception):
39 pass
40
41 EMPTY_JSON_LIST = json.dumps([])
42
43 def cleanupSession(id):
44 try:
45 del Sessions[id]
46 except KeyError:
47 pass
48
49 class IRCSession:
50 def __init__(self, id):
51 self.id = id
52 self.subscriptions = []
53 self.buffer = []
54 self.buflen = 0
55 self.throttle = 0
56 self.schedule = None
57 self.closed = False
58 self.cleanupschedule = None
59
60 def subscribe(self, channel):
61 if len(self.subscriptions) >= config.MAXSUBSCRIPTIONS:
62 self.subscriptions.pop(0).close()
63
64 self.subscriptions.append(channel)
65 self.flush()
66
67 def unsubscribe(self, channel):
68 try:
69 self.subscriptions.remove(channel)
70 except ValueError:
71 pass
72
73 def timeout(self, channel):
74 if self.schedule:
75 return
76
77 self.unsubscribe(channel)
78 channel.write(EMPTY_JSON_LIST)
79
80 def flush(self, scheduled=False):
81 if scheduled:
82 self.schedule = None
83
84 if not self.buffer or not self.subscriptions:
85 return
86
87 t = time.time()
88
89 if t < self.throttle:
90 if not self.schedule:
91 self.schedule = reactor.callLater(self.throttle - t, self.flush, True)
92 return
93 else:
94 # process the rest of the packet
95 if not scheduled:
96 if not self.schedule:
97 self.schedule = reactor.callLater(0, self.flush, True)
98 return
99
100 self.throttle = t + config.UPDATE_FREQ
101
102 encdata = json.dumps(self.buffer)
103 self.buffer = []
104 self.buflen = 0
105
106 subs = self.subscriptions
107 self.subscriptions = newsubs = []
108 for x in subs:
109 if x.write(encdata):
110 newsubs.append(x)
111
112 if self.closed and not newsubs:
113 cleanupSession(self.id)
114
115 def event(self, data):
116 newbuflen = self.buflen + len(data)
117 if newbuflen > config.MAXBUFLEN:
118 self.buffer = []
119 self.client.error("Buffer overflow.")
120 return
121
122 self.buffer.append(data)
123 self.buflen = newbuflen
124 self.flush()
125
126 def push(self, data):
127 if self.closed:
128 return
129
130 if len(data) > config.MAXLINELEN:
131 raise LineTooLongException
132
133 self.client.write(data)
134
135 def disconnect(self):
136 # keep the session hanging around for a few seconds so the
137 # client has a chance to see what the issue was
138 self.closed = True
139
140 reactor.callLater(5, cleanupSession, self.id)
141
142 # DANGER! Breach of encapsulation!
143 def connect_notice(line):
144 return "c", "NOTICE", "", ("AUTH", "*** (qwebirc) %s" % line)
145
146 class RequestChannel(object):
147 def __init__(self, request):
148 self.request = request
149
150 def write(self, data):
151 self.request.write(data)
152 self.request.finish()
153 return False
154
155 def close(self):
156 self.request.finish()
157
158 class AJAXEngine(resource.Resource):
159 isLeaf = True
160
161 def __init__(self, prefix):
162 self.prefix = prefix
163 self.__connect_hit = HitCounter()
164 self.__total_hit = HitCounter()
165
166 def render_POST(self, request):
167 path = request.path[len(self.prefix):]
168 if path[0] == "/":
169 handler = self.COMMANDS.get(path[1:])
170 if handler is not None:
171 try:
172 return handler(self, request)
173 except AJAXException, e:
174 return json.dumps((False, e[0]))
175
176 return "404" ## TODO: tidy up
177
178 def newConnection(self, request):
179 ticket = login_optional(request)
180
181 ip = request.getClientIP()
182
183 nick = request.args.get("nick")
184 if not nick:
185 raise AJAXException, "Nickname not supplied."
186 nick = ircclient.irc_decode(nick[0])
187
188 password = request.args.get("password")
189 if password is not None:
190 password = ircclient.irc_decode(password[0])
191
192 for i in range(10):
193 id = get_session_id()
194 if not Sessions.get(id):
195 break
196 else:
197 raise IDGenerationException()
198
199 session = IRCSession(id)
200
201 qticket = getSessionData(request).get("qticket")
202 if qticket is None:
203 perform = None
204 else:
205 service_mask = config.AUTH_SERVICE
206 msg_mask = service_mask.split("!")[0] + "@" + service_mask.split("@", 1)[1]
207 perform = ["PRIVMSG %s :TICKETAUTH %s" % (msg_mask, qticket)]
208
209 ident, realname = config.IDENT, config.REALNAME
210 if ident is config_options.IDENT_HEX or ident is None: # latter is legacy
211 ident = socket.inet_aton(ip).encode("hex")
212 elif ident is config_options.IDENT_NICKNAME:
213 ident = nick
214
215 self.__connect_hit()
216
217 def proceed(hostname):
218 kwargs = dict(nick=nick, ident=ident, ip=ip, realname=realname, perform=perform, hostname=hostname)
219 if password is not None:
220 kwargs["password"] = password
221
222 client = ircclient.createIRC(session, **kwargs)
223 session.client = client
224
225 if not hasattr(config, "WEBIRC_MODE") or config.WEBIRC_MODE == "hmac":
226 proceed(None)
227 elif config.WEBIRC_MODE != "hmac":
228 notice = lambda x: session.event(connect_notice(x))
229 notice("Looking up your hostname...")
230 def callback(hostname):
231 notice("Found your hostname.")
232 proceed(hostname)
233 def errback(failure):
234 notice("Couldn't look up your hostname!")
235 proceed(ip)
236 qdns.lookupAndVerifyPTR(ip, timeout=[config.DNS_TIMEOUT]).addCallbacks(callback, errback)
237
238 Sessions[id] = session
239
240 return json.dumps((True, id, TRANSPORTS))
241
242 def getSession(self, request):
243 bad_session_message = "Invalid session, this most likely means the server has restarted; close this dialog and then try refreshing the page."
244
245 sessionid = request.args.get("s")
246 if sessionid is None:
247 raise AJAXException, bad_session_message
248
249 session = Sessions.get(sessionid[0])
250 if not session:
251 raise AJAXException, bad_session_message
252 return session
253
254 def subscribe(self, request):
255 request.channel.setTimeout(None)
256
257 channel = RequestChannel(request)
258 session = self.getSession(request)
259 notifier = request.notifyFinish()
260 session.subscribe(channel)
261
262 timeout_entry = reactor.callLater(config.HTTP_AJAX_REQUEST_TIMEOUT, session.timeout, channel)
263 def cancel_timeout(result):
264 try:
265 timeout_entry.cancel()
266 except error.AlreadyCalled:
267 pass
268 session.unsubscribe(channel)
269 notifier.addCallbacks(cancel_timeout, cancel_timeout)
270 return server.NOT_DONE_YET
271
272 def push(self, request):
273 command = request.args.get("c")
274 if command is None:
275 raise AJAXException, "No command specified."
276 self.__total_hit()
277
278 session = self.getSession(request)
279 try:
280 session.push(ircclient.irc_decode(command[0]))
281 except AttributeError: # occurs when we haven't noticed an error
282 session.disconnect()
283 raise AJAXException, "Connection closed by server; try reconnecting by reloading the page."
284 except Exception, e: # catch all
285 session.disconnect()
286 traceback.print_exc(file=sys.stderr)
287 raise AJAXException, "Unknown error."
288
289 return json.dumps((True, True))
290
291 def closeById(self, k):
292 s = Sessions.get(k)
293 if s is None:
294 return
295 s.client.client.error("Closed by admin interface")
296
297 @property
298 def adminEngine(self):
299 return {
300 "Sessions": [(str(v.client.client), AdminEngineAction("close", self.closeById, k)) for k, v in Sessions.iteritems() if not v.closed],
301 "Connections": [(self.__connect_hit,)],
302 "Total hits": [(self.__total_hit,)],
303 }
304
305 COMMANDS = dict(p=push, n=newConnection, s=subscribe)
306
307 if has_websocket:
308 class WebSocketChannel(object):
309 def __init__(self, channel):
310 self.channel = channel
311
312 def write(self, data):
313 self.channel.send("c", data)
314 return True
315
316 def close(self):
317 self.channel.close()
318
319 class WebSocketEngineProtocol(autobahn.websocket.WebSocketServerProtocol):
320 AWAITING_AUTH, AUTHED = 0, 1
321
322 def __init__(self, *args, **kwargs):
323 self.__state = self.AWAITING_AUTH
324 self.__session = None
325 self.__channel = None
326 self.__timeout = None
327
328 def onOpen(self):
329 self.__timeout = reactor.callLater(5, self.close, "Authentication timeout")
330
331 def onClose(self, wasClean, code, reason):
332 self.__cancelTimeout()
333 if self.__session:
334 self.__session.unsubscribe(self.__channel)
335 self.__session = None
336
337 def onMessage(self, msg, binary):
338 # we don't bother checking the Origin header, as if you can auth then you've been able to pass the browser's
339 # normal origin handling (POSTed the new connection request and managed to get the session id)
340 state = self.__state
341 message_type, message = msg[:1], msg[1:]
342 if state == self.AWAITING_AUTH:
343 if message_type == "s": # subscribe
344 session = Sessions.get(message)
345 if not session:
346 self.close(BAD_SESSION_MESSAGE)
347 return
348
349 self.__cancelTimeout()
350 self.__session = session
351 self.send("s", "True")
352 self.__state = self.AUTHED
353 self.__channel = WebSocketChannel(self)
354 session.subscribe(self.__channel)
355 return
356 elif state == self.AUTHED:
357 if message_type == "p": # push
358 self.__session.push(ircclient.irc_decode(message))
359 return
360
361 self.close("Bad message type")
362
363 def __cancelTimeout(self):
364 if self.__timeout is not None:
365 try:
366 self.__timeout.cancel()
367 except error.AlreadyCalled:
368 pass
369 self.__timeout = None
370
371 def close(self, reason=None):
372 self.__cancelTimeout()
373 if reason:
374 self.sendClose(4999, reason)
375 else:
376 self.sendClose(4998)
377
378 if self.__session:
379 self.__session.unsubscribe(self.__channel)
380 self.__session = None
381
382 def send(self, message_type, message):
383 self.sendMessage(message_type + message)
384
385 class WebSocketResource(autobahn.resource.WebSocketResource):
386 def render(self, request):
387 request.channel.setTimeout(None)
388 return autobahn.resource.WebSocketResource.render(self, request)
389
390 def WebSocketEngine(path=None):
391 parsed = urlparse.urlparse(config.BASE_URL)
392 port = parsed.port
393 if port is None:
394 if parsed.scheme == "http":
395 port = 80
396 elif parsed.scheme == "https":
397 port = 443
398 else:
399 raise Exception("Unable to determine port from BASE_URL: " + config.BASE_URL)
400
401 factory = autobahn.websocket.WebSocketServerFactory("ws://localhost:%d" % port)
402 factory.protocol = WebSocketEngineProtocol
403 factory.setProtocolOptions(maxMessagePayloadSize=512, maxFramePayloadSize=512, tcpNoDelay=False)
404 resource = WebSocketResource(factory)
405 return resource