]> jfr.im git - irc/quakenet/qwebirc.git/blobdiff - qwebirc/engines/ajaxengine.py
Merge.
[irc/quakenet/qwebirc.git] / qwebirc / engines / ajaxengine.py
index 11566c5fd8ed275b36a0d4cfdf924f112637383b..26aa7ad5b0d535985d42919932638c9350d998c1 100644 (file)
@@ -21,6 +21,7 @@ except ImportError:
   has_websocket = False
 
 BAD_SESSION_MESSAGE = "Invalid session, this most likely means the server has restarted; close this dialog and then try refreshing the page."
+MAX_SEQNO = 9223372036854775807  # 2**63 - 1... yeah it doesn't wrap
 Sessions = {}
 
 def get_session_id():
@@ -51,18 +52,29 @@ class IRCSession:
     self.id = id
     self.subscriptions = []
     self.buffer = []
+    self.old_buffer = None
     self.buflen = 0
     self.throttle = 0
     self.schedule = None
     self.closed = False
     self.cleanupschedule = None
+    self.pubSeqNo = -1
+    self.subSeqNo = 0
 
-  def subscribe(self, channel):
+  def subscribe(self, channel, seqNo=None):
     if len(self.subscriptions) >= config.MAXSUBSCRIPTIONS:
       self.subscriptions.pop(0).close()
 
+    if seqNo is not None and seqNo < self.subSeqNo:
+      if self.old_buffer is None or seqNo != self.old_buffer[0]:
+        channel.write(json.dumps([False, "Unable to reconnect -- sequence number too old."]), seqNo + 1)
+        return
+
+      if not channel.write(self.old_buffer[1], self.old_buffer[0] + 1):
+        return
+
     self.subscriptions.append(channel)
-    self.flush()
+    self.flush(seqNo)
 
   def unsubscribe(self, channel):
     try:
@@ -73,11 +85,10 @@ class IRCSession:
   def timeout(self, channel):
     if self.schedule:
       return
-      
-    channel.write(EMPTY_JSON_LIST)
-    if channel in self.subscriptions:
-      self.subscriptions.remove(channel)
-      
+
+    self.unsubscribe(channel)
+    channel.write(EMPTY_JSON_LIST, self.subSeqNo)
+
   def flush(self, scheduled=False):
     if scheduled:
       self.schedule = None
@@ -97,20 +108,23 @@ class IRCSession:
         if not self.schedule:
           self.schedule = reactor.callLater(0, self.flush, True)
         return
-        
+
     self.throttle = t + config.UPDATE_FREQ
 
     encdata = json.dumps(self.buffer)
+    self.old_buffer = (self.subSeqNo, encdata)
+    self.subSeqNo+=1
     self.buffer = []
     self.buflen = 0
 
-    newsubs = []
-    for x in self.subscriptions:
-      if x.write(encdata):
+    subs = self.subscriptions
+    self.subscriptions = newsubs = []
+
+    for x in subs:
+      if x.write(encdata, self.subSeqNo):
         newsubs.append(x)
 
-    self.subscriptions = newsubs
-    if self.closed and not self.subscriptions:
+    if self.closed and not newsubs:
       cleanupSession(self.id)
 
   def event(self, data):
@@ -124,13 +138,17 @@ class IRCSession:
     self.buflen = newbuflen
     self.flush()
     
-  def push(self, data):
+  def push(self, data, seq_no=None):
     if self.closed:
       return
 
     if len(data) > config.MAXLINELEN:
       raise LineTooLongException
 
+    if seq_no is not None:
+      if seq_no <= self.pubSeqNo:
+        return
+      self.pubSeqNo = seq_no
     self.client.write(data)
 
   def disconnect(self):
@@ -148,7 +166,8 @@ class RequestChannel(object):
   def __init__(self, request):
     self.request = request
 
-  def write(self, data):
+  def write(self, data, seqNo):
+    self.request.setHeader("n", str(seqNo))
     self.request.write(data)
     self.request.finish()
     return False
@@ -239,7 +258,7 @@ class AJAXEngine(resource.Resource):
     Sessions[id] = session
     
     return json.dumps((True, id, TRANSPORTS))
-  
+
   def getSession(self, request):
     bad_session_message = "Invalid session, this most likely means the server has restarted; close this dialog and then try refreshing the page."
     
@@ -253,20 +272,30 @@ class AJAXEngine(resource.Resource):
     return session
     
   def subscribe(self, request):
-    request.channel.cancelTimeout()
+    request.channel.setTimeout(None)
 
     channel = RequestChannel(request)
     session = self.getSession(request)
     notifier = request.notifyFinish()
-    session.subscribe(channel)
+
+    seq_no = request.args.get("n")
+    try:
+      if seq_no is not None:
+        seq_no = int(seq_no[0])
+        if seq_no < 0 or seq_no > MAX_SEQNO:
+          raise ValueError
+    except ValueError:
+      raise AJAXEngine, "Bad sequence number"
+
+    session.subscribe(channel, seq_no)
 
     timeout_entry = reactor.callLater(config.HTTP_AJAX_REQUEST_TIMEOUT, session.timeout, channel)
     def cancel_timeout(result):
-      session.unsubscribe(self)
       try:
         timeout_entry.cancel()
       except error.AlreadyCalled:
         pass
+      session.unsubscribe(channel)
     notifier.addCallbacks(cancel_timeout, cancel_timeout)
     return server.NOT_DONE_YET
 
@@ -275,10 +304,19 @@ class AJAXEngine(resource.Resource):
     if command is None:
       raise AJAXException, "No command specified."
     self.__total_hit()
-    
+
+    seq_no = request.args.get("n")
+    try:
+      if seq_no is not None:
+        seq_no = int(seq_no[0])
+        if seq_no < 0 or seq_no > MAX_SEQNO:
+          raise ValueError
+    except ValueError:
+      raise AJAXEngine("Bad sequence number %r" % seq_no)
+
     session = self.getSession(request)
     try:
-      session.push(ircclient.irc_decode(command[0]))
+      session.push(ircclient.irc_decode(command[0]), seq_no)
     except AttributeError: # occurs when we haven't noticed an error
       session.disconnect()
       raise AJAXException, "Connection closed by server; try reconnecting by reloading the page."
@@ -310,8 +348,8 @@ if has_websocket:
     def __init__(self, channel):
       self.channel = channel
 
-    def write(self, data):
-      self.channel.send("c", data)
+    def write(self, data, seqNo):
+      self.channel.send("c", "%d,%s" % (seqNo, data))
       return True
 
     def close(self):
@@ -342,6 +380,19 @@ if has_websocket:
       message_type, message = msg[:1], msg[1:]
       if state == self.AWAITING_AUTH:
         if message_type == "s":  # subscribe
+          tokens = message.split(",", 1)
+          if len(tokens) != 2:
+            self.close("Bad tokens")
+            return
+
+          seq_no, message = tokens[0], tokens[1]
+          try:
+            seq_no = int(seq_no)
+            if seq_no < 0 or seq_no > MAX_SEQNO:
+              raise ValueError
+          except ValueError:
+            self.close("Bad value")
+
           session = Sessions.get(message)
           if not session:
             self.close(BAD_SESSION_MESSAGE)
@@ -352,10 +403,22 @@ if has_websocket:
           self.send("s", "True")
           self.__state = self.AUTHED
           self.__channel = WebSocketChannel(self)
-          session.subscribe(self.__channel)
+          session.subscribe(self.__channel, seq_no)
           return
       elif state == self.AUTHED:
         if message_type == "p":  # push
+          tokens = message.split(",", 1)
+          if len(tokens) != 2:
+            self.close("Bad tokens")
+            return
+
+          seq_no, message = tokens[0], tokens[1]
+          try:
+            seq_no = int(seq_no)
+            if seq_no < 0 or seq_no > MAX_SEQNO:
+              raise ValueError
+          except ValueError:
+            self.close("Bad value")
           self.__session.push(ircclient.irc_decode(message))
           return
 
@@ -363,7 +426,10 @@ if has_websocket:
 
     def __cancelTimeout(self):
       if self.__timeout is not None:
-        self.__timeout.cancel()
+        try:
+          self.__timeout.cancel()
+        except error.AlreadyCalled:
+          pass
         self.__timeout = None
 
     def close(self, reason=None):
@@ -382,7 +448,7 @@ if has_websocket:
 
   class WebSocketResource(autobahn.resource.WebSocketResource):
     def render(self, request):
-      request.channel.cancelTimeout()
+      request.channel.setTimeout(None)
       return autobahn.resource.WebSocketResource.render(self, request)
 
   def WebSocketEngine(path=None):