]> jfr.im git - irc/quakenet/qwebirc.git/blobdiff - qwebirc/engines/ajaxengine.py
Merge.
[irc/quakenet/qwebirc.git] / qwebirc / engines / ajaxengine.py
index 6667ffb027f9315846df72a32f846595e59e2fa0..26aa7ad5b0d535985d42919932638c9350d998c1 100644 (file)
@@ -21,6 +21,7 @@ except ImportError:
   has_websocket = False
 
 BAD_SESSION_MESSAGE = "Invalid session, this most likely means the server has restarted; close this dialog and then try refreshing the page."
+MAX_SEQNO = 9223372036854775807  # 2**63 - 1... yeah it doesn't wrap
 Sessions = {}
 
 def get_session_id():
@@ -51,18 +52,29 @@ class IRCSession:
     self.id = id
     self.subscriptions = []
     self.buffer = []
+    self.old_buffer = None
     self.buflen = 0
     self.throttle = 0
     self.schedule = None
     self.closed = False
     self.cleanupschedule = None
+    self.pubSeqNo = -1
+    self.subSeqNo = 0
 
-  def subscribe(self, channel):
+  def subscribe(self, channel, seqNo=None):
     if len(self.subscriptions) >= config.MAXSUBSCRIPTIONS:
       self.subscriptions.pop(0).close()
 
+    if seqNo is not None and seqNo < self.subSeqNo:
+      if self.old_buffer is None or seqNo != self.old_buffer[0]:
+        channel.write(json.dumps([False, "Unable to reconnect -- sequence number too old."]), seqNo + 1)
+        return
+
+      if not channel.write(self.old_buffer[1], self.old_buffer[0] + 1):
+        return
+
     self.subscriptions.append(channel)
-    self.flush()
+    self.flush(seqNo)
 
   def unsubscribe(self, channel):
     try:
@@ -75,7 +87,7 @@ class IRCSession:
       return
 
     self.unsubscribe(channel)
-    channel.write(EMPTY_JSON_LIST)
+    channel.write(EMPTY_JSON_LIST, self.subSeqNo)
 
   def flush(self, scheduled=False):
     if scheduled:
@@ -100,13 +112,16 @@ class IRCSession:
     self.throttle = t + config.UPDATE_FREQ
 
     encdata = json.dumps(self.buffer)
+    self.old_buffer = (self.subSeqNo, encdata)
+    self.subSeqNo+=1
     self.buffer = []
     self.buflen = 0
 
     subs = self.subscriptions
     self.subscriptions = newsubs = []
+
     for x in subs:
-      if x.write(encdata):
+      if x.write(encdata, self.subSeqNo):
         newsubs.append(x)
 
     if self.closed and not newsubs:
@@ -123,13 +138,17 @@ class IRCSession:
     self.buflen = newbuflen
     self.flush()
     
-  def push(self, data):
+  def push(self, data, seq_no=None):
     if self.closed:
       return
 
     if len(data) > config.MAXLINELEN:
       raise LineTooLongException
 
+    if seq_no is not None:
+      if seq_no <= self.pubSeqNo:
+        return
+      self.pubSeqNo = seq_no
     self.client.write(data)
 
   def disconnect(self):
@@ -147,7 +166,8 @@ class RequestChannel(object):
   def __init__(self, request):
     self.request = request
 
-  def write(self, data):
+  def write(self, data, seqNo):
+    self.request.setHeader("n", str(seqNo))
     self.request.write(data)
     self.request.finish()
     return False
@@ -238,7 +258,7 @@ class AJAXEngine(resource.Resource):
     Sessions[id] = session
     
     return json.dumps((True, id, TRANSPORTS))
-  
+
   def getSession(self, request):
     bad_session_message = "Invalid session, this most likely means the server has restarted; close this dialog and then try refreshing the page."
     
@@ -257,7 +277,17 @@ class AJAXEngine(resource.Resource):
     channel = RequestChannel(request)
     session = self.getSession(request)
     notifier = request.notifyFinish()
-    session.subscribe(channel)
+
+    seq_no = request.args.get("n")
+    try:
+      if seq_no is not None:
+        seq_no = int(seq_no[0])
+        if seq_no < 0 or seq_no > MAX_SEQNO:
+          raise ValueError
+    except ValueError:
+      raise AJAXEngine, "Bad sequence number"
+
+    session.subscribe(channel, seq_no)
 
     timeout_entry = reactor.callLater(config.HTTP_AJAX_REQUEST_TIMEOUT, session.timeout, channel)
     def cancel_timeout(result):
@@ -274,10 +304,19 @@ class AJAXEngine(resource.Resource):
     if command is None:
       raise AJAXException, "No command specified."
     self.__total_hit()
-    
+
+    seq_no = request.args.get("n")
+    try:
+      if seq_no is not None:
+        seq_no = int(seq_no[0])
+        if seq_no < 0 or seq_no > MAX_SEQNO:
+          raise ValueError
+    except ValueError:
+      raise AJAXEngine("Bad sequence number %r" % seq_no)
+
     session = self.getSession(request)
     try:
-      session.push(ircclient.irc_decode(command[0]))
+      session.push(ircclient.irc_decode(command[0]), seq_no)
     except AttributeError: # occurs when we haven't noticed an error
       session.disconnect()
       raise AJAXException, "Connection closed by server; try reconnecting by reloading the page."
@@ -309,8 +348,8 @@ if has_websocket:
     def __init__(self, channel):
       self.channel = channel
 
-    def write(self, data):
-      self.channel.send("c", data)
+    def write(self, data, seqNo):
+      self.channel.send("c", "%d,%s" % (seqNo, data))
       return True
 
     def close(self):
@@ -341,6 +380,19 @@ if has_websocket:
       message_type, message = msg[:1], msg[1:]
       if state == self.AWAITING_AUTH:
         if message_type == "s":  # subscribe
+          tokens = message.split(",", 1)
+          if len(tokens) != 2:
+            self.close("Bad tokens")
+            return
+
+          seq_no, message = tokens[0], tokens[1]
+          try:
+            seq_no = int(seq_no)
+            if seq_no < 0 or seq_no > MAX_SEQNO:
+              raise ValueError
+          except ValueError:
+            self.close("Bad value")
+
           session = Sessions.get(message)
           if not session:
             self.close(BAD_SESSION_MESSAGE)
@@ -351,10 +403,22 @@ if has_websocket:
           self.send("s", "True")
           self.__state = self.AUTHED
           self.__channel = WebSocketChannel(self)
-          session.subscribe(self.__channel)
+          session.subscribe(self.__channel, seq_no)
           return
       elif state == self.AUTHED:
         if message_type == "p":  # push
+          tokens = message.split(",", 1)
+          if len(tokens) != 2:
+            self.close("Bad tokens")
+            return
+
+          seq_no, message = tokens[0], tokens[1]
+          try:
+            seq_no = int(seq_no)
+            if seq_no < 0 or seq_no > MAX_SEQNO:
+              raise ValueError
+          except ValueError:
+            self.close("Bad value")
           self.__session.push(ircclient.irc_decode(message))
           return