has_websocket = False
BAD_SESSION_MESSAGE = "Invalid session, this most likely means the server has restarted; close this dialog and then try refreshing the page."
+MAX_SEQNO = 9223372036854775807 # 2**63 - 1... yeah it doesn't wrap
Sessions = {}
def get_session_id():
self.id = id
self.subscriptions = []
self.buffer = []
+ self.old_buffer = None
self.buflen = 0
self.throttle = 0
self.schedule = None
self.closed = False
self.cleanupschedule = None
+ self.pubSeqNo = -1
+ self.subSeqNo = 0
- def subscribe(self, channel):
+ def subscribe(self, channel, seqNo=None):
if len(self.subscriptions) >= config.MAXSUBSCRIPTIONS:
self.subscriptions.pop(0).close()
+ if seqNo is not None and seqNo < self.subSeqNo:
+ if self.old_buffer is None or seqNo != self.old_buffer[0]:
+ channel.write(json.dumps([False, "Unable to reconnect -- sequence number too old."]), seqNo + 1)
+ return
+
+ if not channel.write(self.old_buffer[1], self.old_buffer[0] + 1):
+ return
+
self.subscriptions.append(channel)
- self.flush()
+ self.flush(seqNo)
def unsubscribe(self, channel):
try:
def timeout(self, channel):
if self.schedule:
return
-
- channel.write(EMPTY_JSON_LIST)
- if channel in self.subscriptions:
- self.subscriptions.remove(channel)
-
+
+ self.unsubscribe(channel)
+ channel.write(EMPTY_JSON_LIST, self.subSeqNo)
+
def flush(self, scheduled=False):
if scheduled:
self.schedule = None
if not self.schedule:
self.schedule = reactor.callLater(0, self.flush, True)
return
-
+
self.throttle = t + config.UPDATE_FREQ
encdata = json.dumps(self.buffer)
+ self.old_buffer = (self.subSeqNo, encdata)
+ self.subSeqNo+=1
self.buffer = []
self.buflen = 0
- newsubs = []
- for x in self.subscriptions:
- if x.write(encdata):
+ subs = self.subscriptions
+ self.subscriptions = newsubs = []
+
+ for x in subs:
+ if x.write(encdata, self.subSeqNo):
newsubs.append(x)
- self.subscriptions = newsubs
- if self.closed and not self.subscriptions:
+ if self.closed and not newsubs:
cleanupSession(self.id)
def event(self, data):
self.buflen = newbuflen
self.flush()
- def push(self, data):
+ def push(self, data, seq_no=None):
if self.closed:
return
if len(data) > config.MAXLINELEN:
raise LineTooLongException
+ if seq_no is not None:
+ if seq_no <= self.pubSeqNo:
+ return
+ self.pubSeqNo = seq_no
self.client.write(data)
def disconnect(self):
def __init__(self, request):
self.request = request
- def write(self, data):
+ def write(self, data, seqNo):
+ self.request.setHeader("n", str(seqNo))
self.request.write(data)
self.request.finish()
return False
Sessions[id] = session
return json.dumps((True, id, TRANSPORTS))
-
+
def getSession(self, request):
bad_session_message = "Invalid session, this most likely means the server has restarted; close this dialog and then try refreshing the page."
return session
def subscribe(self, request):
- request.channel.cancelTimeout()
+ request.channel.setTimeout(None)
channel = RequestChannel(request)
session = self.getSession(request)
notifier = request.notifyFinish()
- session.subscribe(channel)
+
+ seq_no = request.args.get("n")
+ try:
+ if seq_no is not None:
+ seq_no = int(seq_no[0])
+ if seq_no < 0 or seq_no > MAX_SEQNO:
+ raise ValueError
+ except ValueError:
+ raise AJAXEngine, "Bad sequence number"
+
+ session.subscribe(channel, seq_no)
timeout_entry = reactor.callLater(config.HTTP_AJAX_REQUEST_TIMEOUT, session.timeout, channel)
def cancel_timeout(result):
- session.unsubscribe(self)
try:
timeout_entry.cancel()
except error.AlreadyCalled:
pass
+ session.unsubscribe(channel)
notifier.addCallbacks(cancel_timeout, cancel_timeout)
return server.NOT_DONE_YET
if command is None:
raise AJAXException, "No command specified."
self.__total_hit()
-
+
+ seq_no = request.args.get("n")
+ try:
+ if seq_no is not None:
+ seq_no = int(seq_no[0])
+ if seq_no < 0 or seq_no > MAX_SEQNO:
+ raise ValueError
+ except ValueError:
+ raise AJAXEngine("Bad sequence number %r" % seq_no)
+
session = self.getSession(request)
try:
- session.push(ircclient.irc_decode(command[0]))
+ session.push(ircclient.irc_decode(command[0]), seq_no)
except AttributeError: # occurs when we haven't noticed an error
session.disconnect()
raise AJAXException, "Connection closed by server; try reconnecting by reloading the page."
def __init__(self, channel):
self.channel = channel
- def write(self, data):
- self.channel.send("c", data)
+ def write(self, data, seqNo):
+ self.channel.send("c", "%d,%s" % (seqNo, data))
return True
def close(self):
message_type, message = msg[:1], msg[1:]
if state == self.AWAITING_AUTH:
if message_type == "s": # subscribe
+ tokens = message.split(",", 1)
+ if len(tokens) != 2:
+ self.close("Bad tokens")
+ return
+
+ seq_no, message = tokens[0], tokens[1]
+ try:
+ seq_no = int(seq_no)
+ if seq_no < 0 or seq_no > MAX_SEQNO:
+ raise ValueError
+ except ValueError:
+ self.close("Bad value")
+
session = Sessions.get(message)
if not session:
self.close(BAD_SESSION_MESSAGE)
self.send("s", "True")
self.__state = self.AUTHED
self.__channel = WebSocketChannel(self)
- session.subscribe(self.__channel)
+ session.subscribe(self.__channel, seq_no)
return
elif state == self.AUTHED:
if message_type == "p": # push
+ tokens = message.split(",", 1)
+ if len(tokens) != 2:
+ self.close("Bad tokens")
+ return
+
+ seq_no, message = tokens[0], tokens[1]
+ try:
+ seq_no = int(seq_no)
+ if seq_no < 0 or seq_no > MAX_SEQNO:
+ raise ValueError
+ except ValueError:
+ self.close("Bad value")
self.__session.push(ircclient.irc_decode(message))
return
def __cancelTimeout(self):
if self.__timeout is not None:
- self.__timeout.cancel()
+ try:
+ self.__timeout.cancel()
+ except error.AlreadyCalled:
+ pass
self.__timeout = None
def close(self, reason=None):
class WebSocketResource(autobahn.resource.WebSocketResource):
def render(self, request):
- request.channel.cancelTimeout()
+ request.channel.setTimeout(None)
return autobahn.resource.WebSocketResource.render(self, request)
def WebSocketEngine(path=None):