]> jfr.im git - irc/quakenet/qwebirc.git/blob - qwebirc/engines/ajaxengine.py
missed sequence number from sync posts
[irc/quakenet/qwebirc.git] / qwebirc / engines / ajaxengine.py
1 from twisted.web import resource, server, static, error as http_error
2 from twisted.names import client
3 from twisted.internet import reactor, error
4 from authgateengine import login_optional, getSessionData
5 import md5, sys, os, time, config, qwebirc.config_options as config_options, traceback, socket
6 import qwebirc.ircclient as ircclient
7 from adminengine import AdminEngineAction
8 from qwebirc.util import HitCounter
9 import qwebirc.dns as qdns
10 import qwebirc.util.qjson as json
11 import urlparse
12
13 TRANSPORTS = ["longpoll"]
14
15 try:
16 import autobahn.websocket
17 import autobahn.resource
18 has_websocket = True
19 TRANSPORTS.append("websocket")
20 except ImportError:
21 has_websocket = False
22
23 BAD_SESSION_MESSAGE = "Invalid session, this most likely means the server has restarted; close this dialog and then try refreshing the page."
24 MAX_SEQNO = 9223372036854775807 # 2**63 - 1... yeah it doesn't wrap
25 Sessions = {}
26
27 def get_session_id():
28 return md5.md5(os.urandom(16)).hexdigest()
29
30 class BufferOverflowException(Exception):
31 pass
32
33 class AJAXException(Exception):
34 pass
35
36 class IDGenerationException(Exception):
37 pass
38
39 class LineTooLongException(Exception):
40 pass
41
42 EMPTY_JSON_LIST = json.dumps([])
43
44 def cleanupSession(id):
45 try:
46 del Sessions[id]
47 except KeyError:
48 pass
49
50 class IRCSession:
51 def __init__(self, id):
52 self.id = id
53 self.subscriptions = []
54 self.buffer = []
55 self.old_buffer = None
56 self.buflen = 0
57 self.throttle = 0
58 self.schedule = None
59 self.closed = False
60 self.cleanupschedule = None
61 self.pubSeqNo = -1
62 self.subSeqNo = 0
63
64 def subscribe(self, channel, seqNo=None):
65 if len(self.subscriptions) >= config.MAXSUBSCRIPTIONS:
66 self.subscriptions.pop(0).close()
67
68 if seqNo < self.subSeqNo:
69 if self.old_buffer is None or seqNo != self.old_buffer[0]:
70 channel.write(json.dumps([False, "Unable to reconnect -- sequence number too old."]), seqNo + 1)
71 return
72
73 if not channel.write(self.old_buffer[1], self.old_buffer[0] + 1):
74 return
75
76 self.subscriptions.append(channel)
77 self.flush(seqNo)
78
79 def unsubscribe(self, channel):
80 try:
81 self.subscriptions.remove(channel)
82 except ValueError:
83 pass
84
85 def timeout(self, channel):
86 if self.schedule:
87 return
88
89 self.unsubscribe(channel)
90 channel.write(EMPTY_JSON_LIST)
91
92 def flush(self, scheduled=False):
93 if scheduled:
94 self.schedule = None
95
96 if not self.buffer or not self.subscriptions:
97 return
98
99 t = time.time()
100
101 if t < self.throttle:
102 if not self.schedule:
103 self.schedule = reactor.callLater(self.throttle - t, self.flush, True)
104 return
105 else:
106 # process the rest of the packet
107 if not scheduled:
108 if not self.schedule:
109 self.schedule = reactor.callLater(0, self.flush, True)
110 return
111
112 self.throttle = t + config.UPDATE_FREQ
113
114 encdata = json.dumps(self.buffer)
115 self.old_buffer = (self.subSeqNo, encdata)
116 self.subSeqNo+=1
117 self.buffer = []
118 self.buflen = 0
119
120 subs = self.subscriptions
121 self.subscriptions = newsubs = []
122
123 for x in subs:
124 if x.write(encdata, self.subSeqNo):
125 newsubs.append(x)
126
127 if self.closed and not newsubs:
128 cleanupSession(self.id)
129
130 def event(self, data):
131 newbuflen = self.buflen + len(data)
132 if newbuflen > config.MAXBUFLEN:
133 self.buffer = []
134 self.client.error("Buffer overflow.")
135 return
136
137 self.buffer.append(data)
138 self.buflen = newbuflen
139 self.flush()
140
141 def push(self, data, seq_no=None):
142 if self.closed:
143 return
144
145 if len(data) > config.MAXLINELEN:
146 raise LineTooLongException
147
148 if seq_no is not None:
149 if seq_no <= self.pubSeqNo:
150 return
151 self.pubSeqNo = seq_no
152 self.client.write(data)
153
154 def disconnect(self):
155 # keep the session hanging around for a few seconds so the
156 # client has a chance to see what the issue was
157 self.closed = True
158
159 reactor.callLater(5, cleanupSession, self.id)
160
161 # DANGER! Breach of encapsulation!
162 def connect_notice(line):
163 return "c", "NOTICE", "", ("AUTH", "*** (qwebirc) %s" % line)
164
165 class RequestChannel(object):
166 def __init__(self, request):
167 self.request = request
168
169 def write(self, data, seqNo):
170 self.request.setHeader("n", str(seqNo))
171 self.request.write(data)
172 self.request.finish()
173 return False
174
175 def close(self):
176 self.request.finish()
177
178 class AJAXEngine(resource.Resource):
179 isLeaf = True
180
181 def __init__(self, prefix):
182 self.prefix = prefix
183 self.__connect_hit = HitCounter()
184 self.__total_hit = HitCounter()
185
186 def render_POST(self, request):
187 path = request.path[len(self.prefix):]
188 if path[0] == "/":
189 handler = self.COMMANDS.get(path[1:])
190 if handler is not None:
191 try:
192 return handler(self, request)
193 except AJAXException, e:
194 return json.dumps((False, e[0]))
195
196 return "404" ## TODO: tidy up
197
198 def newConnection(self, request):
199 ticket = login_optional(request)
200
201 ip = request.getClientIP()
202
203 nick = request.args.get("nick")
204 if not nick:
205 raise AJAXException, "Nickname not supplied."
206 nick = ircclient.irc_decode(nick[0])
207
208 password = request.args.get("password")
209 if password is not None:
210 password = ircclient.irc_decode(password[0])
211
212 for i in range(10):
213 id = get_session_id()
214 if not Sessions.get(id):
215 break
216 else:
217 raise IDGenerationException()
218
219 session = IRCSession(id)
220
221 qticket = getSessionData(request).get("qticket")
222 if qticket is None:
223 perform = None
224 else:
225 service_mask = config.AUTH_SERVICE
226 msg_mask = service_mask.split("!")[0] + "@" + service_mask.split("@", 1)[1]
227 perform = ["PRIVMSG %s :TICKETAUTH %s" % (msg_mask, qticket)]
228
229 ident, realname = config.IDENT, config.REALNAME
230 if ident is config_options.IDENT_HEX or ident is None: # latter is legacy
231 ident = socket.inet_aton(ip).encode("hex")
232 elif ident is config_options.IDENT_NICKNAME:
233 ident = nick
234
235 self.__connect_hit()
236
237 def proceed(hostname):
238 kwargs = dict(nick=nick, ident=ident, ip=ip, realname=realname, perform=perform, hostname=hostname)
239 if password is not None:
240 kwargs["password"] = password
241
242 client = ircclient.createIRC(session, **kwargs)
243 session.client = client
244
245 if not hasattr(config, "WEBIRC_MODE") or config.WEBIRC_MODE == "hmac":
246 proceed(None)
247 elif config.WEBIRC_MODE != "hmac":
248 notice = lambda x: session.event(connect_notice(x))
249 notice("Looking up your hostname...")
250 def callback(hostname):
251 notice("Found your hostname.")
252 proceed(hostname)
253 def errback(failure):
254 notice("Couldn't look up your hostname!")
255 proceed(ip)
256 qdns.lookupAndVerifyPTR(ip, timeout=[config.DNS_TIMEOUT]).addCallbacks(callback, errback)
257
258 Sessions[id] = session
259
260 return json.dumps((True, id, TRANSPORTS))
261
262 def getSession(self, request):
263 bad_session_message = "Invalid session, this most likely means the server has restarted; close this dialog and then try refreshing the page."
264
265 sessionid = request.args.get("s")
266 if sessionid is None:
267 raise AJAXException, bad_session_message
268
269 session = Sessions.get(sessionid[0])
270 if not session:
271 raise AJAXException, bad_session_message
272 return session
273
274 def subscribe(self, request):
275 request.channel.setTimeout(None)
276
277 channel = RequestChannel(request)
278 session = self.getSession(request)
279 notifier = request.notifyFinish()
280
281 seq_no = request.args.get("n")
282 try:
283 if seq_no is not None:
284 seq_no = int(seq_no[0])
285 if seq_no < 0 or seq_no > MAX_SEQNO:
286 raise ValueError
287 except ValueError:
288 raise AJAXEngine, "Bad sequence number"
289
290 session.subscribe(channel, seq_no)
291
292 timeout_entry = reactor.callLater(config.HTTP_AJAX_REQUEST_TIMEOUT, session.timeout, channel)
293 def cancel_timeout(result):
294 try:
295 timeout_entry.cancel()
296 except error.AlreadyCalled:
297 pass
298 session.unsubscribe(channel)
299 notifier.addCallbacks(cancel_timeout, cancel_timeout)
300 return server.NOT_DONE_YET
301
302 def push(self, request):
303 command = request.args.get("c")
304 if command is None:
305 raise AJAXException, "No command specified."
306 self.__total_hit()
307
308 seq_no = request.args.get("n")
309 try:
310 if seq_no is not None:
311 seq_no = int(seq_no[0])
312 if seq_no < 0 or seq_no > MAX_SEQNO:
313 raise ValueError
314 except ValueError:
315 raise AJAXEngine("Bad sequence number %r" % seq_no)
316
317 session = self.getSession(request)
318 try:
319 session.push(ircclient.irc_decode(command[0]), seq_no)
320 except AttributeError: # occurs when we haven't noticed an error
321 session.disconnect()
322 raise AJAXException, "Connection closed by server; try reconnecting by reloading the page."
323 except Exception, e: # catch all
324 session.disconnect()
325 traceback.print_exc(file=sys.stderr)
326 raise AJAXException, "Unknown error."
327
328 return json.dumps((True, True))
329
330 def closeById(self, k):
331 s = Sessions.get(k)
332 if s is None:
333 return
334 s.client.client.error("Closed by admin interface")
335
336 @property
337 def adminEngine(self):
338 return {
339 "Sessions": [(str(v.client.client), AdminEngineAction("close", self.closeById, k)) for k, v in Sessions.iteritems() if not v.closed],
340 "Connections": [(self.__connect_hit,)],
341 "Total hits": [(self.__total_hit,)],
342 }
343
344 COMMANDS = dict(p=push, n=newConnection, s=subscribe)
345
346 if has_websocket:
347 class WebSocketChannel(object):
348 def __init__(self, channel):
349 self.channel = channel
350
351 def write(self, data, seqNo):
352 self.channel.send("c", "%d,%s" % (seqNo, data))
353 return True
354
355 def close(self):
356 self.channel.close()
357
358 class WebSocketEngineProtocol(autobahn.websocket.WebSocketServerProtocol):
359 AWAITING_AUTH, AUTHED = 0, 1
360
361 def __init__(self, *args, **kwargs):
362 self.__state = self.AWAITING_AUTH
363 self.__session = None
364 self.__channel = None
365 self.__timeout = None
366
367 def onOpen(self):
368 self.__timeout = reactor.callLater(5, self.close, "Authentication timeout")
369
370 def onClose(self, wasClean, code, reason):
371 self.__cancelTimeout()
372 if self.__session:
373 self.__session.unsubscribe(self.__channel)
374 self.__session = None
375
376 def onMessage(self, msg, binary):
377 # we don't bother checking the Origin header, as if you can auth then you've been able to pass the browser's
378 # normal origin handling (POSTed the new connection request and managed to get the session id)
379 state = self.__state
380 message_type, message = msg[:1], msg[1:]
381 if state == self.AWAITING_AUTH:
382 if message_type == "s": # subscribe
383 tokens = message.split(",", 2)
384 if len(tokens) != 2:
385 self.close("Bad tokens")
386 return
387
388 seq_no, message = tokens[0], tokens[1]
389 try:
390 seq_no = int(seq_no)
391 if seq_no < 0 or seq_no > MAX_SEQNO:
392 raise ValueError
393 except ValueError:
394 self.close("Bad value")
395
396 session = Sessions.get(message)
397 if not session:
398 self.close(BAD_SESSION_MESSAGE)
399 return
400
401 self.__cancelTimeout()
402 self.__session = session
403 self.send("s", "True")
404 self.__state = self.AUTHED
405 self.__channel = WebSocketChannel(self)
406 session.subscribe(self.__channel, seq_no)
407 return
408 elif state == self.AUTHED:
409 if message_type == "p": # push
410 tokens = message.split(",", 2)
411 if len(tokens) != 2:
412 self.close("Bad tokens")
413 return
414
415 seq_no, message = tokens[0], tokens[1]
416 try:
417 seq_no = int(seq_no)
418 if seq_no < 0 or seq_no > MAX_SEQNO:
419 raise ValueError
420 except ValueError:
421 self.close("Bad value")
422 self.__session.push(ircclient.irc_decode(message))
423 return
424
425 self.close("Bad message type")
426
427 def __cancelTimeout(self):
428 if self.__timeout is not None:
429 try:
430 self.__timeout.cancel()
431 except error.AlreadyCalled:
432 pass
433 self.__timeout = None
434
435 def close(self, reason=None):
436 self.__cancelTimeout()
437 if reason:
438 self.sendClose(4999, reason)
439 else:
440 self.sendClose(4998)
441
442 if self.__session:
443 self.__session.unsubscribe(self.__channel)
444 self.__session = None
445
446 def send(self, message_type, message):
447 self.sendMessage(message_type + message)
448
449 class WebSocketResource(autobahn.resource.WebSocketResource):
450 def render(self, request):
451 request.channel.setTimeout(None)
452 return autobahn.resource.WebSocketResource.render(self, request)
453
454 def WebSocketEngine(path=None):
455 parsed = urlparse.urlparse(config.BASE_URL)
456 port = parsed.port
457 if port is None:
458 if parsed.scheme == "http":
459 port = 80
460 elif parsed.scheme == "https":
461 port = 443
462 else:
463 raise Exception("Unable to determine port from BASE_URL: " + config.BASE_URL)
464
465 factory = autobahn.websocket.WebSocketServerFactory("ws://localhost:%d" % port)
466 factory.protocol = WebSocketEngineProtocol
467 factory.setProtocolOptions(maxMessagePayloadSize=512, maxFramePayloadSize=512, tcpNoDelay=False)
468 resource = WebSocketResource(factory)
469 return resource