]> jfr.im git - irc/quakenet/qwebirc.git/blame - qwebirc/engines/ajaxengine.py
add some stupid logging
[irc/quakenet/qwebirc.git] / qwebirc / engines / ajaxengine.py
CommitLineData
99844c15 1from twisted.web import resource, server, static, error as http_error
9e769c12 2from twisted.names import client
265f5ce3 3from twisted.internet import reactor, error
ace37679 4from authgateengine import login_optional, getSessionData
becfa850 5import md5, sys, os, time, config, qwebirc.config_options as config_options, traceback, socket
85f01e3f
CP
6import qwebirc.ircclient as ircclient
7from adminengine import AdminEngineAction
8from qwebirc.util import HitCounter
28c4ad01 9import qwebirc.dns as qdns
becfa850 10import qwebirc.util.qjson as json
c60795d6
CP
11import urlparse
12
13TRANSPORTS = ["longpoll"]
14
15try:
16 import autobahn.websocket
17 import autobahn.resource
18 has_websocket = True
19 TRANSPORTS.append("websocket")
20except ImportError:
21 has_websocket = False
22
23BAD_SESSION_MESSAGE = "Invalid session, this most likely means the server has restarted; close this dialog and then try refreshing the page."
9e769c12
CP
24Sessions = {}
25
26def get_session_id():
4e4bbf26 27 return md5.md5(os.urandom(16)).hexdigest()
8dc46dfa
CP
28
29class BufferOverflowException(Exception):
30 pass
31
f59585a7
CP
32class AJAXException(Exception):
33 pass
34
4094890f
CP
35class IDGenerationException(Exception):
36 pass
37
c60795d6 38class LineTooLongException(Exception):
99844c15 39 pass
bdd008f9 40
c60795d6 41EMPTY_JSON_LIST = json.dumps([])
9e769c12 42
8dc46dfa
CP
43def cleanupSession(id):
44 try:
45 del Sessions[id]
46 except KeyError:
47 pass
48
9e769c12
CP
49class IRCSession:
50 def __init__(self, id):
51 self.id = id
52 self.subscriptions = []
53 self.buffer = []
8932790b 54 self.buflen = 0
9e769c12
CP
55 self.throttle = 0
56 self.schedule = None
8dc46dfa
CP
57 self.closed = False
58 self.cleanupschedule = None
59
c60795d6 60 def subscribe(self, channel):
0df6faa6 61 if len(self.subscriptions) >= config.MAXSUBSCRIPTIONS:
4e221566 62 self.subscriptions.pop(0).close()
0df6faa6 63
9e769c12
CP
64 self.subscriptions.append(channel)
65 self.flush()
c60795d6
CP
66
67 def unsubscribe(self, channel):
68 try:
69 self.subscriptions.remove(channel)
70 except ValueError:
71 pass
72
265f5ce3
CP
73 def timeout(self, channel):
74 if self.schedule:
75 return
76
becfa850 77 channel.write(EMPTY_JSON_LIST)
265f5ce3
CP
78 if channel in self.subscriptions:
79 self.subscriptions.remove(channel)
80
9e769c12
CP
81 def flush(self, scheduled=False):
82 if scheduled:
83 self.schedule = None
84
85 if not self.buffer or not self.subscriptions:
86 return
87
88 t = time.time()
89
90 if t < self.throttle:
91 if not self.schedule:
92 self.schedule = reactor.callLater(self.throttle - t, self.flush, True)
93 return
94 else:
95 # process the rest of the packet
96 if not scheduled:
97 if not self.schedule:
98 self.schedule = reactor.callLater(0, self.flush, True)
99 return
100
101 self.throttle = t + config.UPDATE_FREQ
102
becfa850 103 encdata = json.dumps(self.buffer)
9e769c12 104 self.buffer = []
8932790b
CP
105 self.buflen = 0
106
9e769c12
CP
107 newsubs = []
108 for x in self.subscriptions:
109 if x.write(encdata):
110 newsubs.append(x)
111
112 self.subscriptions = newsubs
8dc46dfa
CP
113 if self.closed and not self.subscriptions:
114 cleanupSession(self.id)
115
9e769c12 116 def event(self, data):
8932790b
CP
117 newbuflen = self.buflen + len(data)
118 if newbuflen > config.MAXBUFLEN:
8dc46dfa 119 self.buffer = []
99844c15 120 self.client.error("Buffer overflow.")
8dc46dfa
CP
121 return
122
9e769c12 123 self.buffer.append(data)
8932790b 124 self.buflen = newbuflen
9e769c12
CP
125 self.flush()
126
127 def push(self, data):
c60795d6
CP
128 if self.closed:
129 return
130
131 if len(data) > config.MAXLINELEN:
132 raise LineTooLongException
133
134 self.client.write(data)
8dc46dfa
CP
135
136 def disconnect(self):
137 # keep the session hanging around for a few seconds so the
138 # client has a chance to see what the issue was
139 self.closed = True
140
141 reactor.callLater(5, cleanupSession, self.id)
142
28c4ad01
CP
143# DANGER! Breach of encapsulation!
144def connect_notice(line):
145 return "c", "NOTICE", "", ("AUTH", "*** (qwebirc) %s" % line)
146
c60795d6 147class RequestChannel(object):
9e769c12
CP
148 def __init__(self, request):
149 self.request = request
c60795d6 150
9e769c12
CP
151 def write(self, data):
152 self.request.write(data)
153 self.request.finish()
154 return False
155
4e221566
CP
156 def close(self):
157 self.request.finish()
158
9e769c12
CP
159class AJAXEngine(resource.Resource):
160 isLeaf = True
161
162 def __init__(self, prefix):
163 self.prefix = prefix
85f01e3f
CP
164 self.__connect_hit = HitCounter()
165 self.__total_hit = HitCounter()
166
57ea572e 167 def render_POST(self, request):
9e769c12 168 path = request.path[len(self.prefix):]
f59585a7
CP
169 if path[0] == "/":
170 handler = self.COMMANDS.get(path[1:])
171 if handler is not None:
c60795d6
CP
172 try:
173 return handler(self, request)
174 except AJAXException, e:
175 return json.dumps((False, e[0]))
176
177 return "404" ## TODO: tidy up
f59585a7 178
f59585a7 179 def newConnection(self, request):
f065bc69
CP
180 ticket = login_optional(request)
181
23f85e9b 182 ip = request.getClientIP()
9e769c12 183
c70a7ff6 184 nick = request.args.get("nick")
f59585a7 185 if not nick:
99844c15 186 raise AJAXException, "Nickname not supplied."
c70a7ff6 187 nick = ircclient.irc_decode(nick[0])
57ea572e 188
6ce70043 189 password = request.args.get("password")
2f74dea9
CP
190 if password is not None:
191 password = ircclient.irc_decode(password[0])
192
c60795d6 193 for i in range(10):
f59585a7
CP
194 id = get_session_id()
195 if not Sessions.get(id):
196 break
197 else:
198 raise IDGenerationException()
9e769c12 199
f59585a7 200 session = IRCSession(id)
9e769c12 201
ace37679
CP
202 qticket = getSessionData(request).get("qticket")
203 if qticket is None:
204 perform = None
205 else:
348574ee
CP
206 service_mask = config.AUTH_SERVICE
207 msg_mask = service_mask.split("!")[0] + "@" + service_mask.split("@", 1)[1]
208 perform = ["PRIVMSG %s :TICKETAUTH %s" % (msg_mask, qticket)]
ace37679 209
b5c84380 210 ident, realname = config.IDENT, config.REALNAME
930be88a 211 if ident is config_options.IDENT_HEX or ident is None: # latter is legacy
b5c84380 212 ident = socket.inet_aton(ip).encode("hex")
930be88a
CP
213 elif ident is config_options.IDENT_NICKNAME:
214 ident = nick
b5c84380 215
85f01e3f 216 self.__connect_hit()
28c4ad01
CP
217
218 def proceed(hostname):
2f74dea9
CP
219 kwargs = dict(nick=nick, ident=ident, ip=ip, realname=realname, perform=perform, hostname=hostname)
220 if password is not None:
221 kwargs["password"] = password
222
223 client = ircclient.createIRC(session, **kwargs)
28c4ad01
CP
224 session.client = client
225
930be88a
CP
226 if not hasattr(config, "WEBIRC_MODE") or config.WEBIRC_MODE == "hmac":
227 proceed(None)
228 elif config.WEBIRC_MODE != "hmac":
28c4ad01
CP
229 notice = lambda x: session.event(connect_notice(x))
230 notice("Looking up your hostname...")
231 def callback(hostname):
232 notice("Found your hostname.")
233 proceed(hostname)
234 def errback(failure):
235 notice("Couldn't look up your hostname!")
236 proceed(ip)
237 qdns.lookupAndVerifyPTR(ip, timeout=[config.DNS_TIMEOUT]).addCallbacks(callback, errback)
28c4ad01 238
f59585a7
CP
239 Sessions[id] = session
240
c60795d6 241 return json.dumps((True, id, TRANSPORTS))
f59585a7
CP
242
243 def getSession(self, request):
71afd444
CP
244 bad_session_message = "Invalid session, this most likely means the server has restarted; close this dialog and then try refreshing the page."
245
f59585a7
CP
246 sessionid = request.args.get("s")
247 if sessionid is None:
71afd444 248 raise AJAXException, bad_session_message
9e769c12 249
f59585a7
CP
250 session = Sessions.get(sessionid[0])
251 if not session:
71afd444 252 raise AJAXException, bad_session_message
f59585a7 253 return session
8dc46dfa 254
f59585a7 255 def subscribe(self, request):
1d924d97 256 request.channel.cancelTimeout()
c60795d6
CP
257
258 channel = RequestChannel(request)
259 session = self.getSession(request)
260 notifier = request.notifyFinish()
261 session.subscribe(channel)
262
263 timeout_entry = reactor.callLater(config.HTTP_AJAX_REQUEST_TIMEOUT, session.timeout, channel)
264 def cancel_timeout(result):
265 session.unsubscribe(self)
266 try:
267 timeout_entry.cancel()
268 except error.AlreadyCalled:
269 pass
270 notifier.addCallbacks(cancel_timeout, cancel_timeout)
271 return server.NOT_DONE_YET
9e769c12 272
f59585a7
CP
273 def push(self, request):
274 command = request.args.get("c")
275 if command is None:
99844c15 276 raise AJAXException, "No command specified."
85f01e3f
CP
277 self.__total_hit()
278
f59585a7 279 session = self.getSession(request)
f59585a7 280 try:
c60795d6 281 session.push(ircclient.irc_decode(command[0]))
f59585a7
CP
282 except AttributeError: # occurs when we haven't noticed an error
283 session.disconnect()
99844c15 284 raise AJAXException, "Connection closed by server; try reconnecting by reloading the page."
f59585a7
CP
285 except Exception, e: # catch all
286 session.disconnect()
287 traceback.print_exc(file=sys.stderr)
71afd444 288 raise AJAXException, "Unknown error."
f59585a7 289
c60795d6 290 return json.dumps((True, True))
f59585a7 291
85f01e3f
CP
292 def closeById(self, k):
293 s = Sessions.get(k)
294 if s is None:
295 return
296 s.client.client.error("Closed by admin interface")
297
298 @property
299 def adminEngine(self):
300 return {
301 "Sessions": [(str(v.client.client), AdminEngineAction("close", self.closeById, k)) for k, v in Sessions.iteritems() if not v.closed],
302 "Connections": [(self.__connect_hit,)],
303 "Total hits": [(self.__total_hit,)],
304 }
305
f59585a7 306 COMMANDS = dict(p=push, n=newConnection, s=subscribe)
b5c84380 307
c60795d6
CP
308if has_websocket:
309 class WebSocketChannel(object):
310 def __init__(self, channel):
311 self.channel = channel
312
313 def write(self, data):
314 self.channel.send("c", data)
315 return True
316
317 def close(self):
318 self.channel.close()
319
320 class WebSocketEngineProtocol(autobahn.websocket.WebSocketServerProtocol):
321 AWAITING_AUTH, AUTHED = 0, 1
322
323 def __init__(self, *args, **kwargs):
324 self.__state = self.AWAITING_AUTH
325 self.__session = None
326 self.__channel = None
327 self.__timeout = None
328
329 def onOpen(self):
330 self.__timeout = reactor.callLater(5, self.close, "Authentication timeout")
331
332 def onClose(self, wasClean, code, reason):
333 self.__cancelTimeout()
334 if self.__session:
335 self.__session.unsubscribe(self.__channel)
336 self.__session = None
337
338 def onMessage(self, msg, binary):
339 # we don't bother checking the Origin header, as if you can auth then you've been able to pass the browser's
340 # normal origin handling (POSTed the new connection request and managed to get the session id)
341 state = self.__state
342 message_type, message = msg[:1], msg[1:]
343 if state == self.AWAITING_AUTH:
344 if message_type == "s": # subscribe
345 session = Sessions.get(message)
346 if not session:
347 self.close(BAD_SESSION_MESSAGE)
348 return
349
350 self.__cancelTimeout()
351 self.__session = session
352 self.send("s", "True")
353 self.__state = self.AUTHED
354 self.__channel = WebSocketChannel(self)
355 session.subscribe(self.__channel)
356 return
357 elif state == self.AUTHED:
358 if message_type == "p": # push
359 self.__session.push(ircclient.irc_decode(message))
360 return
361
362 self.close("Bad message type")
363
364 def __cancelTimeout(self):
365 if self.__timeout is not None:
366 self.__timeout.cancel()
367 self.__timeout = None
368
369 def close(self, reason=None):
370 self.__cancelTimeout()
371 if reason:
372 self.sendClose(4999, reason)
373 else:
374 self.sendClose(4998)
375
376 if self.__session:
377 self.__session.unsubscribe(self.__channel)
378 self.__session = None
379
380 def send(self, message_type, message):
381 self.sendMessage(message_type + message)
382
383 class WebSocketResource(autobahn.resource.WebSocketResource):
384 def render(self, request):
385 request.channel.cancelTimeout()
386 return autobahn.resource.WebSocketResource.render(self, request)
387
388 def WebSocketEngine(path=None):
389 parsed = urlparse.urlparse(config.BASE_URL)
390 port = parsed.port
391 if port is None:
392 if parsed.scheme == "http":
393 port = 80
394 elif parsed.scheme == "https":
395 port = 443
396 else:
397 raise Exception("Unable to determine port from BASE_URL: " + config.BASE_URL)
398
399 factory = autobahn.websocket.WebSocketServerFactory("ws://localhost:%d" % port)
400 factory.protocol = WebSocketEngineProtocol
401 factory.setProtocolOptions(maxMessagePayloadSize=512, maxFramePayloadSize=512, tcpNoDelay=False)
402 resource = WebSocketResource(factory)
403 return resource