/* This could do with a rewrite from scratch. */
-//window.WEB_SOCKET_DEBUG = QWEBIRC_DEBUG;
-//window.WEB_SOCKET_FORCE_FLASH = true;
+//WEB_SOCKET_DEBUG = QWEBIRC_DEBUG;
+//WEB_SOCKET_FORCE_FLASH = true;
+//FORCE_LONGPOLL = true;
qwebirc.irc.IRCConnection = new Class({
Implements: [Events, Options],
this.__wsEverConnected = false;
this.__ws = null;
this.__wsAuthed = false;
+
+ this.__pubSeqNo = 0;
+ this.__subSeqNo = 0;
+ this.__sendRetries = 0;
},
__error: function(text) {
this.fireEvent("error", text);
+ this.log("ERROR: " + text);
if(this.options.errorAlert)
alert(text);
},
return false;
},
send: function(data, synchronous) {
+ this.__pubSeqNo++;
if(this.disconnected)
return false;
if(synchronous) {
this.__send(data, false);
} else if(this.__ws && this.__wsAuthed) {
- this.__ws.send("p" + data);
+ /* seqno here is currently pointless but it's nice to enforce it in the protocol */
+ this.__ws.send("p" + this.__pubSeqNo + "," + data);
} else {
- this.__sendQueue.push(data);
+ this.__sendQueue.push([this.__pubSeqNo, data]);
this.__processSendQueue();
}
return;
this.__sendQueueActive = true;
- this.__send(this.__sendQueue.shift(), true);
+ this.__send(this.__sendQueue[0], true);
},
__send: function(data, queued) {
+ this.log("called send(" + data[1] + ")");
var r = this.newRequest("p", false, !queued); /* !queued == synchronous */
if(r === null)
return;
-
+
+ var handled = false;
+ var retryEvent = null;
+ if(queued) {
+ var timeout = function() {
+ this.log("timeout for " + data[1] + " fired");
+ r.cancel();
+ retry();
+ }.delay(7500, this);
+ }
r.addEvent("complete", function(o) {
- if(queued)
+ this.log("complete for " + data[1] + " fired");
+ if(queued) {
+ $clear(timeout);
this.__sendQueueActive = false;
+ }
+ if(retryEvent) {
+ this.log("cleared retry event");
+ $clear(retryEvent);
+ }
+ this.__sendRetries = 0;
+ this.__sendQueue.shift();
if(!o || (o[0] == false)) {
this.__sendQueue = [];
if(!this.disconnected) {
this.disconnect();
- this.__error("An error occurred: " + o[1]);
+ this.__error("An error occurred: " + (o ? o[1] : "(unknown error)"));
}
return false;
}
this.__processSendQueue();
}.bind(this));
-
- r.send("s=" + this.sessionid + "&c=" + encodeURIComponent(data));
+
+ if(queued) {
+ var retry = function() {
+ this.log("retry for " + data[1] + " fired... handled is " + handled);
+ if(handled)
+ return;
+ handled = true;
+ $clear(timeout);
+ this.log("Unable to send command " + data + "... retrying (attempt " + this.__sendRetries + ")");
+ if(this.__sendRetries++ < 3) {
+ retryEvent = this.__send.delay(1500 * this.__sendRetries + Math.random() * 1000, this, [data, queued]);
+ } else {
+ this.disconnect();
+ this.__error("Unable to send command after multiple retries.");
+ }
+ }.bind(this);
+ r.addEvent("error", retry);
+ r.addEvent("failure", retry);
+ }
+ r.send("s=" + this.sessionid + "&c=" + encodeURIComponent(data[1]) + "&n=" + data[0]);
},
__processData: function(o) {
- if(o[0] == false) {
+ if(!o || o[0] == false) {
if(!this.disconnected) {
this.disconnect();
- this.__error("An error occurred: " + o[1]);
+ this.__error("An error occurred: " + (o ? o[1] : "(unknown error)"));
}
return false;
}
if(this.__timeout + this.options.timeoutIncrement <= this.options.maxTimeout)
this.__timeout+=this.options.timeoutIncrement;
- qwebirc.util.log("timeout occurred... timeout value now " + this.__timeout);
-
this.__recvLongPoll();
},
__checkRetries: function() {
if(this.__timeout - this.options.timeoutIncrement >= this.options.minTimeout)
this.__timeout-=this.options.timeoutIncrement;
- qwebirc.util.log("checkRetries: timeout value now " + this.__timeout);
-
return true;
},
recv: function() {
}
} else {
if(data.charAt(0) == "c") {
+ var message = data.substr(1);
+ var tokens = message.splitMax(",", 2);
+ this.__subSeqNo = Number(tokens[0]);
this.__processData(JSON.decode(data.substr(1)));
return;
}
ws.onopen = function() {
$clear(connectionTimeout);
this.log("websocket connected");
- ws.send("s" + this.sessionid);
+ ws.send("s" + this.__subSeqNo + "," + this.sessionid);
}.bind(this);
this.__ws = ws;
},
/* if we're a replaced request then no need to fire off another poll as it's already been done */
if(r.__replaced) {
this.__lastActiveRequest = null;
-
- if(o)
+ if(o) {
+ this.__subSeqNo = Number(r.xhr.getResponseHeader("N"));
this.__processData(o);
+ }
+
return;
}
this.__recvLongPoll();
return;
}
-
+
+ this.__subSeqNo = Number(r.xhr.getResponseHeader("N"));
if(this.__processData(o))
this.__recvLongPoll();
};
r.addEvent("complete", onComplete.bind(this));
this.__timeoutId = this.__timeoutEvent.delay(this.__timeout, this);
- r.send("s=" + this.sessionid);
+ r.send("s=" + this.sessionid + "&n=" + this.__subSeqNo);
},
connect: function() {
this.cacheAvoidance = qwebirc.util.randHexString(16);
if(state.loading)
return;
+ if(window.FORCE_LONGPOLL) {
+ log("FORCE_LONGPOLL set");
+ state.result = false;
+ callback(false);
+ return;
+ }
+
if(!window.WEB_SOCKET_FORCE_FLASH) {
if(window.WebSocket) {
log("WebSocket detected");
log("no WebSocket support in browser and no Flash");
state.result = false;
callback(false);
+ return;
}
log("No WebSocket support present in client, but flash enabled... attempting to load FlashWebSocket...");
has_websocket = False
BAD_SESSION_MESSAGE = "Invalid session, this most likely means the server has restarted; close this dialog and then try refreshing the page."
+MAX_SEQNO = 9223372036854775807 # 2**63 - 1... yeah it doesn't wrap
Sessions = {}
def get_session_id():
self.id = id
self.subscriptions = []
self.buffer = []
+ self.old_buffer = None
self.buflen = 0
self.throttle = 0
self.schedule = None
self.closed = False
self.cleanupschedule = None
+ self.pubSeqNo = -1
+ self.subSeqNo = 0
- def subscribe(self, channel):
+ def subscribe(self, channel, seqNo=None):
if len(self.subscriptions) >= config.MAXSUBSCRIPTIONS:
self.subscriptions.pop(0).close()
+ if seqNo < self.subSeqNo:
+ if self.old_buffer is None or seqNo != self.old_buffer[0]:
+ channel.write(json.dumps([False, "Unable to reconnect -- sequence number too old."]), seqNo + 1)
+ return
+
+ if not channel.write(self.old_buffer[1], self.old_buffer[0] + 1):
+ return
+
self.subscriptions.append(channel)
- self.flush()
+ self.flush(seqNo)
def unsubscribe(self, channel):
try:
self.throttle = t + config.UPDATE_FREQ
encdata = json.dumps(self.buffer)
+ self.old_buffer = (self.subSeqNo, encdata)
+ self.subSeqNo+=1
self.buffer = []
self.buflen = 0
subs = self.subscriptions
self.subscriptions = newsubs = []
+
for x in subs:
- if x.write(encdata):
+ if x.write(encdata, self.subSeqNo):
newsubs.append(x)
if self.closed and not newsubs:
self.buflen = newbuflen
self.flush()
- def push(self, data):
+ def push(self, data, seq_no=None):
if self.closed:
return
if len(data) > config.MAXLINELEN:
raise LineTooLongException
+ if seq_no is not None:
+ if seq_no <= self.pubSeqNo:
+ return
+ self.pubSeqNo = seq_no
self.client.write(data)
def disconnect(self):
def __init__(self, request):
self.request = request
- def write(self, data):
+ def write(self, data, seqNo):
+ self.request.setHeader("n", str(seqNo))
self.request.write(data)
self.request.finish()
return False
Sessions[id] = session
return json.dumps((True, id, TRANSPORTS))
-
+
def getSession(self, request):
bad_session_message = "Invalid session, this most likely means the server has restarted; close this dialog and then try refreshing the page."
channel = RequestChannel(request)
session = self.getSession(request)
notifier = request.notifyFinish()
- session.subscribe(channel)
+
+ seq_no = request.args.get("n")
+ try:
+ if seq_no is not None:
+ seq_no = int(seq_no[0])
+ if seq_no < 0 or seq_no > MAX_SEQNO:
+ raise ValueError
+ except ValueError:
+ raise AJAXEngine, "Bad sequence number"
+
+ session.subscribe(channel, seq_no)
timeout_entry = reactor.callLater(config.HTTP_AJAX_REQUEST_TIMEOUT, session.timeout, channel)
def cancel_timeout(result):
if command is None:
raise AJAXException, "No command specified."
self.__total_hit()
-
+
+ seq_no = request.args.get("n")
+ try:
+ if seq_no is not None:
+ seq_no = int(seq_no[0])
+ if seq_no < 0 or seq_no > MAX_SEQNO:
+ raise ValueError
+ except ValueError:
+ raise AJAXEngine, "Bad sequence number"
+
session = self.getSession(request)
try:
- session.push(ircclient.irc_decode(command[0]))
+ session.push(ircclient.irc_decode(command[0]), seq_no)
except AttributeError: # occurs when we haven't noticed an error
session.disconnect()
raise AJAXException, "Connection closed by server; try reconnecting by reloading the page."
def __init__(self, channel):
self.channel = channel
- def write(self, data):
- self.channel.send("c", data)
+ def write(self, data, seqNo):
+ self.channel.send("c", "%d,%s" % (seqNo, data))
return True
def close(self):
message_type, message = msg[:1], msg[1:]
if state == self.AWAITING_AUTH:
if message_type == "s": # subscribe
+ tokens = message.split(",", 2)
+ if len(tokens) != 2:
+ self.close("Bad tokens")
+ return
+
+ seq_no, message = tokens[0], tokens[1]
+ try:
+ seq_no = int(seq_no)
+ if seq_no < 0 or seq_no > MAX_SEQNO:
+ raise ValueError
+ except ValueError:
+ self.close("Bad value")
+
session = Sessions.get(message)
if not session:
self.close(BAD_SESSION_MESSAGE)
self.send("s", "True")
self.__state = self.AUTHED
self.__channel = WebSocketChannel(self)
- session.subscribe(self.__channel)
+ session.subscribe(self.__channel, seq_no)
return
elif state == self.AUTHED:
if message_type == "p": # push
+ tokens = message.split(",", 2)
+ if len(tokens) != 2:
+ self.close("Bad tokens")
+ return
+
+ seq_no, message = tokens[0], tokens[1]
+ try:
+ seq_no = int(seq_no)
+ if seq_no < 0 or seq_no > MAX_SEQNO:
+ raise ValueError
+ except ValueError:
+ self.close("Bad value")
self.__session.push(ircclient.irc_decode(message))
return