X-Git-Url: https://jfr.im/git/irc/quakenet/qwebirc.git/blobdiff_plain/1bccb3e65483874591013380d11c6864796a3356..48f0fc4323550fdafaa33bb99725f16ab975b3d5:/qwebirc/engines/ajaxengine.py diff --git a/qwebirc/engines/ajaxengine.py b/qwebirc/engines/ajaxengine.py index 6667ffb..33bc67b 100644 --- a/qwebirc/engines/ajaxengine.py +++ b/qwebirc/engines/ajaxengine.py @@ -9,18 +9,29 @@ from qwebirc.util import HitCounter import qwebirc.dns as qdns import qwebirc.util.qjson as json import urlparse +import qwebirc.util.autobahn_check as autobahn_check TRANSPORTS = ["longpoll"] -try: - import autobahn.websocket - import autobahn.resource +has_websocket = False +autobahn_status = autobahn_check.check() +if autobahn_status == True: + import autobahn + import autobahn.twisted.websocket + import autobahn.twisted.resource has_websocket = True TRANSPORTS.append("websocket") -except ImportError: - has_websocket = False +elif autobahn_status == False: + # they've been warned already + pass +else: + print >>sys.stderr, "WARNING:" + print >>sys.stderr, " %s" % autobahn_status + print >>sys.stderr, " as a result websocket support is disabled." + print >>sys.stderr, " upgrade your version of autobahn from http://autobahn.ws/python/getstarted/" BAD_SESSION_MESSAGE = "Invalid session, this most likely means the server has restarted; close this dialog and then try refreshing the page." +MAX_SEQNO = 9223372036854775807 # 2**63 - 1... yeah it doesn't wrap Sessions = {} def get_session_id(): @@ -51,18 +62,29 @@ class IRCSession: self.id = id self.subscriptions = [] self.buffer = [] + self.old_buffer = None self.buflen = 0 self.throttle = 0 self.schedule = None self.closed = False self.cleanupschedule = None + self.pubSeqNo = -1 + self.subSeqNo = 0 - def subscribe(self, channel): + def subscribe(self, channel, seqNo=None): if len(self.subscriptions) >= config.MAXSUBSCRIPTIONS: self.subscriptions.pop(0).close() + if seqNo is not None and seqNo < self.subSeqNo: + if self.old_buffer is None or seqNo != self.old_buffer[0]: + channel.write(json.dumps([False, "Unable to reconnect -- sequence number too old."]), seqNo + 1) + return + + if not channel.write(self.old_buffer[1], self.old_buffer[0] + 1): + return + self.subscriptions.append(channel) - self.flush() + self.flush(seqNo) def unsubscribe(self, channel): try: @@ -75,7 +97,7 @@ class IRCSession: return self.unsubscribe(channel) - channel.write(EMPTY_JSON_LIST) + channel.write(EMPTY_JSON_LIST, self.subSeqNo) def flush(self, scheduled=False): if scheduled: @@ -100,13 +122,16 @@ class IRCSession: self.throttle = t + config.UPDATE_FREQ encdata = json.dumps(self.buffer) + self.old_buffer = (self.subSeqNo, encdata) + self.subSeqNo+=1 self.buffer = [] self.buflen = 0 subs = self.subscriptions self.subscriptions = newsubs = [] + for x in subs: - if x.write(encdata): + if x.write(encdata, self.subSeqNo): newsubs.append(x) if self.closed and not newsubs: @@ -123,13 +148,17 @@ class IRCSession: self.buflen = newbuflen self.flush() - def push(self, data): + def push(self, data, seq_no=None): if self.closed: return if len(data) > config.MAXLINELEN: raise LineTooLongException + if seq_no is not None: + if seq_no <= self.pubSeqNo: + return + self.pubSeqNo = seq_no self.client.write(data) def disconnect(self): @@ -147,7 +176,8 @@ class RequestChannel(object): def __init__(self, request): self.request = request - def write(self, data): + def write(self, data, seqNo): + self.request.setHeader("n", str(seqNo)) self.request.write(data) self.request.finish() return False @@ -238,7 +268,7 @@ class AJAXEngine(resource.Resource): Sessions[id] = session return json.dumps((True, id, TRANSPORTS)) - + def getSession(self, request): bad_session_message = "Invalid session, this most likely means the server has restarted; close this dialog and then try refreshing the page." @@ -257,7 +287,17 @@ class AJAXEngine(resource.Resource): channel = RequestChannel(request) session = self.getSession(request) notifier = request.notifyFinish() - session.subscribe(channel) + + seq_no = request.args.get("n") + try: + if seq_no is not None: + seq_no = int(seq_no[0]) + if seq_no < 0 or seq_no > MAX_SEQNO: + raise ValueError + except ValueError: + raise AJAXEngine, "Bad sequence number" + + session.subscribe(channel, seq_no) timeout_entry = reactor.callLater(config.HTTP_AJAX_REQUEST_TIMEOUT, session.timeout, channel) def cancel_timeout(result): @@ -274,10 +314,19 @@ class AJAXEngine(resource.Resource): if command is None: raise AJAXException, "No command specified." self.__total_hit() - + + seq_no = request.args.get("n") + try: + if seq_no is not None: + seq_no = int(seq_no[0]) + if seq_no < 0 or seq_no > MAX_SEQNO: + raise ValueError + except ValueError: + raise AJAXEngine("Bad sequence number %r" % seq_no) + session = self.getSession(request) try: - session.push(ircclient.irc_decode(command[0])) + session.push(ircclient.irc_decode(command[0]), seq_no) except AttributeError: # occurs when we haven't noticed an error session.disconnect() raise AJAXException, "Connection closed by server; try reconnecting by reloading the page." @@ -309,17 +358,18 @@ if has_websocket: def __init__(self, channel): self.channel = channel - def write(self, data): - self.channel.send("c", data) + def write(self, data, seqNo): + self.channel.send("c", "%d,%s" % (seqNo, data)) return True def close(self): self.channel.close() - class WebSocketEngineProtocol(autobahn.websocket.WebSocketServerProtocol): + class WebSocketEngineProtocol(autobahn.twisted.websocket.WebSocketServerProtocol): AWAITING_AUTH, AUTHED = 0, 1 def __init__(self, *args, **kwargs): + super(WebSocketEngineProtocol, self).__init__(*args, **kwargs) self.__state = self.AWAITING_AUTH self.__session = None self.__channel = None @@ -334,13 +384,26 @@ if has_websocket: self.__session.unsubscribe(self.__channel) self.__session = None - def onMessage(self, msg, binary): + def onMessage(self, msg, isBinary): # we don't bother checking the Origin header, as if you can auth then you've been able to pass the browser's # normal origin handling (POSTed the new connection request and managed to get the session id) state = self.__state message_type, message = msg[:1], msg[1:] if state == self.AWAITING_AUTH: if message_type == "s": # subscribe + tokens = message.split(",", 1) + if len(tokens) != 2: + self.close("Bad tokens") + return + + seq_no, message = tokens[0], tokens[1] + try: + seq_no = int(seq_no) + if seq_no < 0 or seq_no > MAX_SEQNO: + raise ValueError + except ValueError: + self.close("Bad value") + session = Sessions.get(message) if not session: self.close(BAD_SESSION_MESSAGE) @@ -351,10 +414,22 @@ if has_websocket: self.send("s", "True") self.__state = self.AUTHED self.__channel = WebSocketChannel(self) - session.subscribe(self.__channel) + session.subscribe(self.__channel, seq_no) return elif state == self.AUTHED: if message_type == "p": # push + tokens = message.split(",", 1) + if len(tokens) != 2: + self.close("Bad tokens") + return + + seq_no, message = tokens[0], tokens[1] + try: + seq_no = int(seq_no) + if seq_no < 0 or seq_no > MAX_SEQNO: + raise ValueError + except ValueError: + self.close("Bad value") self.__session.push(ircclient.irc_decode(message)) return @@ -371,7 +446,7 @@ if has_websocket: def close(self, reason=None): self.__cancelTimeout() if reason: - self.sendClose(4999, reason) + self.sendClose(4999, unicode(reason)) else: self.sendClose(4998) @@ -382,24 +457,16 @@ if has_websocket: def send(self, message_type, message): self.sendMessage(message_type + message) - class WebSocketResource(autobahn.resource.WebSocketResource): + class WebSocketResource(autobahn.twisted.resource.WebSocketResource): def render(self, request): request.channel.setTimeout(None) - return autobahn.resource.WebSocketResource.render(self, request) + return autobahn.twisted.resource.WebSocketResource.render(self, request) def WebSocketEngine(path=None): - parsed = urlparse.urlparse(config.BASE_URL) - port = parsed.port - if port is None: - if parsed.scheme == "http": - port = 80 - elif parsed.scheme == "https": - port = 443 - else: - raise Exception("Unable to determine port from BASE_URL: " + config.BASE_URL) - - factory = autobahn.websocket.WebSocketServerFactory("ws://localhost:%d" % port) + factory = autobahn.twisted.websocket.WebSocketServerFactory("ws://localhost") + factory.externalPort = None factory.protocol = WebSocketEngineProtocol factory.setProtocolOptions(maxMessagePayloadSize=512, maxFramePayloadSize=512, tcpNoDelay=False) resource = WebSocketResource(factory) return resource +