]> jfr.im git - irc/quakenet/qwebirc.git/blobdiff - qwebirc/engines/ajaxengine.py
tidy up autobahn support -- now requires 0.17.2
[irc/quakenet/qwebirc.git] / qwebirc / engines / ajaxengine.py
index 6667ffb027f9315846df72a32f846595e59e2fa0..33bc67bf8585601125ff3424e037535d9339de06 100644 (file)
@@ -9,18 +9,29 @@ from qwebirc.util import HitCounter
 import qwebirc.dns as qdns
 import qwebirc.util.qjson as json
 import urlparse
+import qwebirc.util.autobahn_check as autobahn_check
 
 TRANSPORTS = ["longpoll"]
 
-try:
-  import autobahn.websocket
-  import autobahn.resource
+has_websocket = False
+autobahn_status = autobahn_check.check()
+if autobahn_status == True:
+  import autobahn
+  import autobahn.twisted.websocket
+  import autobahn.twisted.resource
   has_websocket = True
   TRANSPORTS.append("websocket")
-except ImportError:
-  has_websocket = False
+elif autobahn_status == False:
+  # they've been warned already
+  pass
+else:
+  print >>sys.stderr, "WARNING:"
+  print >>sys.stderr, "  %s" % autobahn_status
+  print >>sys.stderr, "  as a result websocket support is disabled."
+  print >>sys.stderr, "  upgrade your version of autobahn from http://autobahn.ws/python/getstarted/"
 
 BAD_SESSION_MESSAGE = "Invalid session, this most likely means the server has restarted; close this dialog and then try refreshing the page."
+MAX_SEQNO = 9223372036854775807  # 2**63 - 1... yeah it doesn't wrap
 Sessions = {}
 
 def get_session_id():
@@ -51,18 +62,29 @@ class IRCSession:
     self.id = id
     self.subscriptions = []
     self.buffer = []
+    self.old_buffer = None
     self.buflen = 0
     self.throttle = 0
     self.schedule = None
     self.closed = False
     self.cleanupschedule = None
+    self.pubSeqNo = -1
+    self.subSeqNo = 0
 
-  def subscribe(self, channel):
+  def subscribe(self, channel, seqNo=None):
     if len(self.subscriptions) >= config.MAXSUBSCRIPTIONS:
       self.subscriptions.pop(0).close()
 
+    if seqNo is not None and seqNo < self.subSeqNo:
+      if self.old_buffer is None or seqNo != self.old_buffer[0]:
+        channel.write(json.dumps([False, "Unable to reconnect -- sequence number too old."]), seqNo + 1)
+        return
+
+      if not channel.write(self.old_buffer[1], self.old_buffer[0] + 1):
+        return
+
     self.subscriptions.append(channel)
-    self.flush()
+    self.flush(seqNo)
 
   def unsubscribe(self, channel):
     try:
@@ -75,7 +97,7 @@ class IRCSession:
       return
 
     self.unsubscribe(channel)
-    channel.write(EMPTY_JSON_LIST)
+    channel.write(EMPTY_JSON_LIST, self.subSeqNo)
 
   def flush(self, scheduled=False):
     if scheduled:
@@ -100,13 +122,16 @@ class IRCSession:
     self.throttle = t + config.UPDATE_FREQ
 
     encdata = json.dumps(self.buffer)
+    self.old_buffer = (self.subSeqNo, encdata)
+    self.subSeqNo+=1
     self.buffer = []
     self.buflen = 0
 
     subs = self.subscriptions
     self.subscriptions = newsubs = []
+
     for x in subs:
-      if x.write(encdata):
+      if x.write(encdata, self.subSeqNo):
         newsubs.append(x)
 
     if self.closed and not newsubs:
@@ -123,13 +148,17 @@ class IRCSession:
     self.buflen = newbuflen
     self.flush()
     
-  def push(self, data):
+  def push(self, data, seq_no=None):
     if self.closed:
       return
 
     if len(data) > config.MAXLINELEN:
       raise LineTooLongException
 
+    if seq_no is not None:
+      if seq_no <= self.pubSeqNo:
+        return
+      self.pubSeqNo = seq_no
     self.client.write(data)
 
   def disconnect(self):
@@ -147,7 +176,8 @@ class RequestChannel(object):
   def __init__(self, request):
     self.request = request
 
-  def write(self, data):
+  def write(self, data, seqNo):
+    self.request.setHeader("n", str(seqNo))
     self.request.write(data)
     self.request.finish()
     return False
@@ -238,7 +268,7 @@ class AJAXEngine(resource.Resource):
     Sessions[id] = session
     
     return json.dumps((True, id, TRANSPORTS))
-  
+
   def getSession(self, request):
     bad_session_message = "Invalid session, this most likely means the server has restarted; close this dialog and then try refreshing the page."
     
@@ -257,7 +287,17 @@ class AJAXEngine(resource.Resource):
     channel = RequestChannel(request)
     session = self.getSession(request)
     notifier = request.notifyFinish()
-    session.subscribe(channel)
+
+    seq_no = request.args.get("n")
+    try:
+      if seq_no is not None:
+        seq_no = int(seq_no[0])
+        if seq_no < 0 or seq_no > MAX_SEQNO:
+          raise ValueError
+    except ValueError:
+      raise AJAXEngine, "Bad sequence number"
+
+    session.subscribe(channel, seq_no)
 
     timeout_entry = reactor.callLater(config.HTTP_AJAX_REQUEST_TIMEOUT, session.timeout, channel)
     def cancel_timeout(result):
@@ -274,10 +314,19 @@ class AJAXEngine(resource.Resource):
     if command is None:
       raise AJAXException, "No command specified."
     self.__total_hit()
-    
+
+    seq_no = request.args.get("n")
+    try:
+      if seq_no is not None:
+        seq_no = int(seq_no[0])
+        if seq_no < 0 or seq_no > MAX_SEQNO:
+          raise ValueError
+    except ValueError:
+      raise AJAXEngine("Bad sequence number %r" % seq_no)
+
     session = self.getSession(request)
     try:
-      session.push(ircclient.irc_decode(command[0]))
+      session.push(ircclient.irc_decode(command[0]), seq_no)
     except AttributeError: # occurs when we haven't noticed an error
       session.disconnect()
       raise AJAXException, "Connection closed by server; try reconnecting by reloading the page."
@@ -309,17 +358,18 @@ if has_websocket:
     def __init__(self, channel):
       self.channel = channel
 
-    def write(self, data):
-      self.channel.send("c", data)
+    def write(self, data, seqNo):
+      self.channel.send("c", "%d,%s" % (seqNo, data))
       return True
 
     def close(self):
       self.channel.close()
 
-  class WebSocketEngineProtocol(autobahn.websocket.WebSocketServerProtocol):
+  class WebSocketEngineProtocol(autobahn.twisted.websocket.WebSocketServerProtocol):
     AWAITING_AUTH, AUTHED = 0, 1
 
     def __init__(self, *args, **kwargs):
+      super(WebSocketEngineProtocol, self).__init__(*args, **kwargs)
       self.__state = self.AWAITING_AUTH
       self.__session = None
       self.__channel = None
@@ -334,13 +384,26 @@ if has_websocket:
         self.__session.unsubscribe(self.__channel)
         self.__session = None
 
-    def onMessage(self, msg, binary):
+    def onMessage(self, msg, isBinary):
       # we don't bother checking the Origin header, as if you can auth then you've been able to pass the browser's
       # normal origin handling (POSTed the new connection request and managed to get the session id)
       state = self.__state
       message_type, message = msg[:1], msg[1:]
       if state == self.AWAITING_AUTH:
         if message_type == "s":  # subscribe
+          tokens = message.split(",", 1)
+          if len(tokens) != 2:
+            self.close("Bad tokens")
+            return
+
+          seq_no, message = tokens[0], tokens[1]
+          try:
+            seq_no = int(seq_no)
+            if seq_no < 0 or seq_no > MAX_SEQNO:
+              raise ValueError
+          except ValueError:
+            self.close("Bad value")
+
           session = Sessions.get(message)
           if not session:
             self.close(BAD_SESSION_MESSAGE)
@@ -351,10 +414,22 @@ if has_websocket:
           self.send("s", "True")
           self.__state = self.AUTHED
           self.__channel = WebSocketChannel(self)
-          session.subscribe(self.__channel)
+          session.subscribe(self.__channel, seq_no)
           return
       elif state == self.AUTHED:
         if message_type == "p":  # push
+          tokens = message.split(",", 1)
+          if len(tokens) != 2:
+            self.close("Bad tokens")
+            return
+
+          seq_no, message = tokens[0], tokens[1]
+          try:
+            seq_no = int(seq_no)
+            if seq_no < 0 or seq_no > MAX_SEQNO:
+              raise ValueError
+          except ValueError:
+            self.close("Bad value")
           self.__session.push(ircclient.irc_decode(message))
           return
 
@@ -371,7 +446,7 @@ if has_websocket:
     def close(self, reason=None):
       self.__cancelTimeout()
       if reason:
-        self.sendClose(4999, reason)
+        self.sendClose(4999, unicode(reason))
       else:
         self.sendClose(4998)
 
@@ -382,24 +457,16 @@ if has_websocket:
     def send(self, message_type, message):
       self.sendMessage(message_type + message)
 
-  class WebSocketResource(autobahn.resource.WebSocketResource):
+  class WebSocketResource(autobahn.twisted.resource.WebSocketResource):
     def render(self, request):
       request.channel.setTimeout(None)
-      return autobahn.resource.WebSocketResource.render(self, request)
+      return autobahn.twisted.resource.WebSocketResource.render(self, request)
 
   def WebSocketEngine(path=None):
-    parsed = urlparse.urlparse(config.BASE_URL)
-    port = parsed.port
-    if port is None:
-      if parsed.scheme == "http":
-        port = 80
-      elif parsed.scheme == "https":
-        port = 443
-      else:
-        raise Exception("Unable to determine port from BASE_URL: " + config.BASE_URL)
-
-    factory = autobahn.websocket.WebSocketServerFactory("ws://localhost:%d" % port)
+    factory = autobahn.twisted.websocket.WebSocketServerFactory("ws://localhost")
+    factory.externalPort = None
     factory.protocol = WebSocketEngineProtocol
     factory.setProtocolOptions(maxMessagePayloadSize=512, maxFramePayloadSize=512, tcpNoDelay=False)
     resource = WebSocketResource(factory)
     return resource
+