]> jfr.im git - irc/quakenet/qwebirc.git/commitdiff
add (very limited) sequence numbers to publish and subscribe... should fix lost messages
authorChris Porter <redacted>
Fri, 3 Jan 2014 13:03:01 +0000 (13:03 +0000)
committerChris Porter <redacted>
Fri, 3 Jan 2014 13:03:01 +0000 (13:03 +0000)
js/irc/ircconnection.js
qwebirc/engines/ajaxengine.py

index 188f24dfb35ce79c6c7c09556285d3c81f5cafb6..0363ef8bc5d9f020f61b4db68c414426063b19ed 100644 (file)
@@ -1,7 +1,8 @@
 /* This could do with a rewrite from scratch. */
 
-//window.WEB_SOCKET_DEBUG = QWEBIRC_DEBUG;
-//window.WEB_SOCKET_FORCE_FLASH = true;
+//WEB_SOCKET_DEBUG = QWEBIRC_DEBUG;
+//WEB_SOCKET_FORCE_FLASH = true;
+//FORCE_LONGPOLL = true;
 
 qwebirc.irc.IRCConnection = new Class({
   Implements: [Events, Options],
@@ -46,9 +47,14 @@ qwebirc.irc.IRCConnection = new Class({
     this.__wsEverConnected = false;
     this.__ws = null;
     this.__wsAuthed = false;
+
+    this.__pubSeqNo = 0;
+    this.__subSeqNo = 0;
+    this.__sendRetries = 0;
   },
   __error: function(text) {
     this.fireEvent("error", text);
+    this.log("ERROR: " + text);
     if(this.options.errorAlert)
       alert(text);
   },
@@ -113,15 +119,17 @@ qwebirc.irc.IRCConnection = new Class({
     return false;
   },
   send: function(data, synchronous) {
+    this.__pubSeqNo++;
     if(this.disconnected)
       return false;
 
     if(synchronous) {
       this.__send(data, false);
     } else if(this.__ws && this.__wsAuthed) {
-      this.__ws.send("p" + data);
+      /* seqno here is currently pointless but it's nice to enforce it in the protocol */
+      this.__ws.send("p" + this.__pubSeqNo + "," + data);
     } else {
-      this.__sendQueue.push(data);
+      this.__sendQueue.push([this.__pubSeqNo, data]);
       this.__processSendQueue();
     }
     
@@ -132,37 +140,74 @@ qwebirc.irc.IRCConnection = new Class({
       return;
 
     this.__sendQueueActive = true;
-    this.__send(this.__sendQueue.shift(), true);
+    this.__send(this.__sendQueue[0], true);
   },
   __send: function(data, queued) {
+    this.log("called send(" + data[1] + ")");
     var r = this.newRequest("p", false, !queued); /* !queued == synchronous */
     if(r === null)
       return;
-      
+
+    var handled = false;
+    var retryEvent = null;
+    if(queued) {
+      var timeout = function() {
+        this.log("timeout for " + data[1] + " fired");
+        r.cancel();
+        retry();
+      }.delay(7500, this);
+    }
     r.addEvent("complete", function(o) {
-      if(queued)
+      this.log("complete for " + data[1] + " fired");
+      if(queued) {
+        $clear(timeout);
         this.__sendQueueActive = false;
+      }
+      if(retryEvent) {
+        this.log("cleared retry event");
+        $clear(retryEvent);
+      }
+      this.__sendRetries = 0;
+      this.__sendQueue.shift();
 
       if(!o || (o[0] == false)) {
         this.__sendQueue = [];
         
         if(!this.disconnected) {
           this.disconnect();
-          this.__error("An error occurred: " + o[1]);
+          this.__error("An error occurred: " + (o ? o[1] : "(unknown error)"));
         }
         return false;
       }
       
       this.__processSendQueue();
     }.bind(this));
-    
-    r.send("s=" + this.sessionid + "&c=" + encodeURIComponent(data));
+
+    if(queued) {
+      var retry = function() {
+        this.log("retry for " + data[1] + " fired... handled is " + handled);
+        if(handled)
+          return;
+        handled = true;
+        $clear(timeout);
+        this.log("Unable to send command " + data + "... retrying (attempt " + this.__sendRetries + ")");
+        if(this.__sendRetries++ < 3) {
+          retryEvent = this.__send.delay(1500 * this.__sendRetries + Math.random() * 1000, this, [data, queued]);
+        } else {
+          this.disconnect();
+          this.__error("Unable to send command after multiple retries.");
+        }
+      }.bind(this);
+      r.addEvent("error", retry);
+      r.addEvent("failure", retry);
+    }
+    r.send("s=" + this.sessionid + "&c=" + encodeURIComponent(data[1]) + "&n=" + data[0]);
   },
   __processData: function(o) {
-    if(o[0] == false) {
+    if(!o || o[0] == false) {
       if(!this.disconnected) {
         this.disconnect();
-        this.__error("An error occurred: " + o[1]);
+        this.__error("An error occurred: " + (o ? o[1] : "(unknown error)"));
       }
       return false;
     }
@@ -195,8 +240,6 @@ qwebirc.irc.IRCConnection = new Class({
     if(this.__timeout + this.options.timeoutIncrement <= this.options.maxTimeout)
       this.__timeout+=this.options.timeoutIncrement;
 
-    qwebirc.util.log("timeout occurred... timeout value now " + this.__timeout);
-
     this.__recvLongPoll();
   },
   __checkRetries: function() {
@@ -211,8 +254,6 @@ qwebirc.irc.IRCConnection = new Class({
     if(this.__timeout - this.options.timeoutIncrement >= this.options.minTimeout)
       this.__timeout-=this.options.timeoutIncrement;
 
-    qwebirc.util.log("checkRetries: timeout value now " + this.__timeout);
-
     return true;
   },
   recv: function() {
@@ -290,6 +331,9 @@ qwebirc.irc.IRCConnection = new Class({
         }
       } else {
         if(data.charAt(0) == "c") {
+          var message = data.substr(1);
+          var tokens = message.splitMax(",", 2);
+          this.__subSeqNo = Number(tokens[0]);
           this.__processData(JSON.decode(data.substr(1)));
           return;
         }
@@ -306,7 +350,7 @@ qwebirc.irc.IRCConnection = new Class({
     ws.onopen = function() {
       $clear(connectionTimeout);
       this.log("websocket connected");
-      ws.send("s" + this.sessionid);
+      ws.send("s" + this.__subSeqNo + "," + this.sessionid);
     }.bind(this);
     this.__ws = ws;
   },
@@ -322,9 +366,11 @@ qwebirc.irc.IRCConnection = new Class({
       /* if we're a replaced request then no need to fire off another poll as it's already been done */
       if(r.__replaced) {
         this.__lastActiveRequest = null;
-        
-        if(o)          
+        if(o) {
+          this.__subSeqNo = Number(r.xhr.getResponseHeader("N"));
           this.__processData(o);
+        }
+
         return;
       }
 
@@ -340,7 +386,8 @@ qwebirc.irc.IRCConnection = new Class({
           this.__recvLongPoll();
         return;
       }
-      
+
+      this.__subSeqNo = Number(r.xhr.getResponseHeader("N"));
       if(this.__processData(o))
         this.__recvLongPoll();
     };
@@ -348,7 +395,7 @@ qwebirc.irc.IRCConnection = new Class({
     r.addEvent("complete", onComplete.bind(this));
 
     this.__timeoutId = this.__timeoutEvent.delay(this.__timeout, this);
-    r.send("s=" + this.sessionid);
+    r.send("s=" + this.sessionid + "&n=" + this.__subSeqNo);
   },
   connect: function() {
     this.cacheAvoidance = qwebirc.util.randHexString(16);
@@ -429,6 +476,13 @@ qwebirc.util.WebSocket = function(callback) {
   if(state.loading)
     return;
 
+  if(window.FORCE_LONGPOLL) {
+    log("FORCE_LONGPOLL set");
+    state.result = false;
+    callback(false);
+    return;
+  }
+
   if(!window.WEB_SOCKET_FORCE_FLASH) {
     if(window.WebSocket) {
       log("WebSocket detected");
@@ -450,6 +504,7 @@ qwebirc.util.WebSocket = function(callback) {
     log("no WebSocket support in browser and no Flash");
     state.result = false;
     callback(false);
+    return;
   }
 
   log("No WebSocket support present in client, but flash enabled... attempting to load FlashWebSocket...");
index 6667ffb027f9315846df72a32f846595e59e2fa0..3a1c0244c487aba7fcdabb91d35e5b4b518fbce8 100644 (file)
@@ -21,6 +21,7 @@ except ImportError:
   has_websocket = False
 
 BAD_SESSION_MESSAGE = "Invalid session, this most likely means the server has restarted; close this dialog and then try refreshing the page."
+MAX_SEQNO = 9223372036854775807  # 2**63 - 1... yeah it doesn't wrap
 Sessions = {}
 
 def get_session_id():
@@ -51,18 +52,29 @@ class IRCSession:
     self.id = id
     self.subscriptions = []
     self.buffer = []
+    self.old_buffer = None
     self.buflen = 0
     self.throttle = 0
     self.schedule = None
     self.closed = False
     self.cleanupschedule = None
+    self.pubSeqNo = -1
+    self.subSeqNo = 0
 
-  def subscribe(self, channel):
+  def subscribe(self, channel, seqNo=None):
     if len(self.subscriptions) >= config.MAXSUBSCRIPTIONS:
       self.subscriptions.pop(0).close()
 
+    if seqNo < self.subSeqNo:
+      if self.old_buffer is None or seqNo != self.old_buffer[0]:
+        channel.write(json.dumps([False, "Unable to reconnect -- sequence number too old."]), seqNo + 1)
+        return
+
+      if not channel.write(self.old_buffer[1], self.old_buffer[0] + 1):
+        return
+
     self.subscriptions.append(channel)
-    self.flush()
+    self.flush(seqNo)
 
   def unsubscribe(self, channel):
     try:
@@ -100,13 +112,16 @@ class IRCSession:
     self.throttle = t + config.UPDATE_FREQ
 
     encdata = json.dumps(self.buffer)
+    self.old_buffer = (self.subSeqNo, encdata)
+    self.subSeqNo+=1
     self.buffer = []
     self.buflen = 0
 
     subs = self.subscriptions
     self.subscriptions = newsubs = []
+
     for x in subs:
-      if x.write(encdata):
+      if x.write(encdata, self.subSeqNo):
         newsubs.append(x)
 
     if self.closed and not newsubs:
@@ -123,13 +138,17 @@ class IRCSession:
     self.buflen = newbuflen
     self.flush()
     
-  def push(self, data):
+  def push(self, data, seq_no=None):
     if self.closed:
       return
 
     if len(data) > config.MAXLINELEN:
       raise LineTooLongException
 
+    if seq_no is not None:
+      if seq_no <= self.pubSeqNo:
+        return
+      self.pubSeqNo = seq_no
     self.client.write(data)
 
   def disconnect(self):
@@ -147,7 +166,8 @@ class RequestChannel(object):
   def __init__(self, request):
     self.request = request
 
-  def write(self, data):
+  def write(self, data, seqNo):
+    self.request.setHeader("n", str(seqNo))
     self.request.write(data)
     self.request.finish()
     return False
@@ -238,7 +258,7 @@ class AJAXEngine(resource.Resource):
     Sessions[id] = session
     
     return json.dumps((True, id, TRANSPORTS))
-  
+
   def getSession(self, request):
     bad_session_message = "Invalid session, this most likely means the server has restarted; close this dialog and then try refreshing the page."
     
@@ -257,7 +277,17 @@ class AJAXEngine(resource.Resource):
     channel = RequestChannel(request)
     session = self.getSession(request)
     notifier = request.notifyFinish()
-    session.subscribe(channel)
+
+    seq_no = request.args.get("n")
+    try:
+      if seq_no is not None:
+        seq_no = int(seq_no[0])
+        if seq_no < 0 or seq_no > MAX_SEQNO:
+          raise ValueError
+    except ValueError:
+      raise AJAXEngine, "Bad sequence number"
+
+    session.subscribe(channel, seq_no)
 
     timeout_entry = reactor.callLater(config.HTTP_AJAX_REQUEST_TIMEOUT, session.timeout, channel)
     def cancel_timeout(result):
@@ -274,10 +304,19 @@ class AJAXEngine(resource.Resource):
     if command is None:
       raise AJAXException, "No command specified."
     self.__total_hit()
-    
+
+    seq_no = request.args.get("n")
+    try:
+      if seq_no is not None:
+        seq_no = int(seq_no[0])
+        if seq_no < 0 or seq_no > MAX_SEQNO:
+          raise ValueError
+    except ValueError:
+      raise AJAXEngine, "Bad sequence number"
+
     session = self.getSession(request)
     try:
-      session.push(ircclient.irc_decode(command[0]))
+      session.push(ircclient.irc_decode(command[0]), seq_no)
     except AttributeError: # occurs when we haven't noticed an error
       session.disconnect()
       raise AJAXException, "Connection closed by server; try reconnecting by reloading the page."
@@ -309,8 +348,8 @@ if has_websocket:
     def __init__(self, channel):
       self.channel = channel
 
-    def write(self, data):
-      self.channel.send("c", data)
+    def write(self, data, seqNo):
+      self.channel.send("c", "%d,%s" % (seqNo, data))
       return True
 
     def close(self):
@@ -341,6 +380,19 @@ if has_websocket:
       message_type, message = msg[:1], msg[1:]
       if state == self.AWAITING_AUTH:
         if message_type == "s":  # subscribe
+          tokens = message.split(",", 2)
+          if len(tokens) != 2:
+            self.close("Bad tokens")
+            return
+
+          seq_no, message = tokens[0], tokens[1]
+          try:
+            seq_no = int(seq_no)
+            if seq_no < 0 or seq_no > MAX_SEQNO:
+              raise ValueError
+          except ValueError:
+            self.close("Bad value")
+
           session = Sessions.get(message)
           if not session:
             self.close(BAD_SESSION_MESSAGE)
@@ -351,10 +403,22 @@ if has_websocket:
           self.send("s", "True")
           self.__state = self.AUTHED
           self.__channel = WebSocketChannel(self)
-          session.subscribe(self.__channel)
+          session.subscribe(self.__channel, seq_no)
           return
       elif state == self.AUTHED:
         if message_type == "p":  # push
+          tokens = message.split(",", 2)
+          if len(tokens) != 2:
+            self.close("Bad tokens")
+            return
+
+          seq_no, message = tokens[0], tokens[1]
+          try:
+            seq_no = int(seq_no)
+            if seq_no < 0 or seq_no > MAX_SEQNO:
+              raise ValueError
+          except ValueError:
+            self.close("Bad value")
           self.__session.push(ircclient.irc_decode(message))
           return