import qwebirc.dns as qdns
import qwebirc.util.qjson as json
import urlparse
+import qwebirc.util.autobahn_check as autobahn_check
TRANSPORTS = ["longpoll"]
-try:
- import autobahn.websocket
- import autobahn.resource
+has_websocket = False
+autobahn_status = autobahn_check.check()
+if autobahn_status == True:
+ import autobahn
+ import autobahn.twisted.websocket
+ import autobahn.twisted.resource
has_websocket = True
TRANSPORTS.append("websocket")
-except ImportError:
- has_websocket = False
+elif autobahn_status == False:
+ # they've been warned already
+ pass
+else:
+ print >>sys.stderr, "WARNING:"
+ print >>sys.stderr, " %s" % autobahn_status
+ print >>sys.stderr, " as a result websocket support is disabled."
+ print >>sys.stderr, " upgrade your version of autobahn from http://autobahn.ws/python/getstarted/"
BAD_SESSION_MESSAGE = "Invalid session, this most likely means the server has restarted; close this dialog and then try refreshing the page."
+MAX_SEQNO = 9223372036854775807 # 2**63 - 1... yeah it doesn't wrap
Sessions = {}
def get_session_id():
self.id = id
self.subscriptions = []
self.buffer = []
+ self.old_buffer = None
self.buflen = 0
self.throttle = 0
self.schedule = None
self.closed = False
self.cleanupschedule = None
+ self.pubSeqNo = -1
+ self.subSeqNo = 0
- def subscribe(self, channel):
+ def subscribe(self, channel, seqNo=None):
if len(self.subscriptions) >= config.MAXSUBSCRIPTIONS:
self.subscriptions.pop(0).close()
+ if seqNo is not None and seqNo < self.subSeqNo:
+ if self.old_buffer is None or seqNo != self.old_buffer[0]:
+ channel.write(json.dumps([False, "Unable to reconnect -- sequence number too old."]), seqNo + 1)
+ return
+
+ if not channel.write(self.old_buffer[1], self.old_buffer[0] + 1):
+ return
+
self.subscriptions.append(channel)
- self.flush()
+ self.flush(seqNo)
def unsubscribe(self, channel):
try:
return
self.unsubscribe(channel)
- channel.write(EMPTY_JSON_LIST)
+ channel.write(EMPTY_JSON_LIST, self.subSeqNo)
def flush(self, scheduled=False):
if scheduled:
self.throttle = t + config.UPDATE_FREQ
encdata = json.dumps(self.buffer)
+ self.old_buffer = (self.subSeqNo, encdata)
+ self.subSeqNo+=1
self.buffer = []
self.buflen = 0
subs = self.subscriptions
self.subscriptions = newsubs = []
+
for x in subs:
- if x.write(encdata):
+ if x.write(encdata, self.subSeqNo):
newsubs.append(x)
if self.closed and not newsubs:
self.buflen = newbuflen
self.flush()
- def push(self, data):
+ def push(self, data, seq_no=None):
if self.closed:
return
if len(data) > config.MAXLINELEN:
raise LineTooLongException
+ if seq_no is not None:
+ if seq_no <= self.pubSeqNo:
+ return
+ self.pubSeqNo = seq_no
self.client.write(data)
def disconnect(self):
def __init__(self, request):
self.request = request
- def write(self, data):
+ def write(self, data, seqNo):
+ self.request.setHeader("n", str(seqNo))
self.request.write(data)
self.request.finish()
return False
Sessions[id] = session
return json.dumps((True, id, TRANSPORTS))
-
+
def getSession(self, request):
bad_session_message = "Invalid session, this most likely means the server has restarted; close this dialog and then try refreshing the page."
channel = RequestChannel(request)
session = self.getSession(request)
notifier = request.notifyFinish()
- session.subscribe(channel)
+
+ seq_no = request.args.get("n")
+ try:
+ if seq_no is not None:
+ seq_no = int(seq_no[0])
+ if seq_no < 0 or seq_no > MAX_SEQNO:
+ raise ValueError
+ except ValueError:
+ raise AJAXEngine, "Bad sequence number"
+
+ session.subscribe(channel, seq_no)
timeout_entry = reactor.callLater(config.HTTP_AJAX_REQUEST_TIMEOUT, session.timeout, channel)
def cancel_timeout(result):
if command is None:
raise AJAXException, "No command specified."
self.__total_hit()
-
+
+ seq_no = request.args.get("n")
+ try:
+ if seq_no is not None:
+ seq_no = int(seq_no[0])
+ if seq_no < 0 or seq_no > MAX_SEQNO:
+ raise ValueError
+ except ValueError:
+ raise AJAXEngine("Bad sequence number %r" % seq_no)
+
session = self.getSession(request)
try:
- session.push(ircclient.irc_decode(command[0]))
+ session.push(ircclient.irc_decode(command[0]), seq_no)
except AttributeError: # occurs when we haven't noticed an error
session.disconnect()
raise AJAXException, "Connection closed by server; try reconnecting by reloading the page."
def __init__(self, channel):
self.channel = channel
- def write(self, data):
- self.channel.send("c", data)
+ def write(self, data, seqNo):
+ self.channel.send("c", "%d,%s" % (seqNo, data))
return True
def close(self):
self.channel.close()
- class WebSocketEngineProtocol(autobahn.websocket.WebSocketServerProtocol):
+ class WebSocketEngineProtocol(autobahn.twisted.websocket.WebSocketServerProtocol):
AWAITING_AUTH, AUTHED = 0, 1
def __init__(self, *args, **kwargs):
+ super(WebSocketEngineProtocol, self).__init__(*args, **kwargs)
self.__state = self.AWAITING_AUTH
self.__session = None
self.__channel = None
self.__session.unsubscribe(self.__channel)
self.__session = None
- def onMessage(self, msg, binary):
+ def onMessage(self, msg, isBinary):
# we don't bother checking the Origin header, as if you can auth then you've been able to pass the browser's
# normal origin handling (POSTed the new connection request and managed to get the session id)
state = self.__state
message_type, message = msg[:1], msg[1:]
if state == self.AWAITING_AUTH:
if message_type == "s": # subscribe
+ tokens = message.split(",", 1)
+ if len(tokens) != 2:
+ self.close("Bad tokens")
+ return
+
+ seq_no, message = tokens[0], tokens[1]
+ try:
+ seq_no = int(seq_no)
+ if seq_no < 0 or seq_no > MAX_SEQNO:
+ raise ValueError
+ except ValueError:
+ self.close("Bad value")
+
session = Sessions.get(message)
if not session:
self.close(BAD_SESSION_MESSAGE)
self.send("s", "True")
self.__state = self.AUTHED
self.__channel = WebSocketChannel(self)
- session.subscribe(self.__channel)
+ session.subscribe(self.__channel, seq_no)
return
elif state == self.AUTHED:
if message_type == "p": # push
+ tokens = message.split(",", 1)
+ if len(tokens) != 2:
+ self.close("Bad tokens")
+ return
+
+ seq_no, message = tokens[0], tokens[1]
+ try:
+ seq_no = int(seq_no)
+ if seq_no < 0 or seq_no > MAX_SEQNO:
+ raise ValueError
+ except ValueError:
+ self.close("Bad value")
self.__session.push(ircclient.irc_decode(message))
return
def close(self, reason=None):
self.__cancelTimeout()
if reason:
- self.sendClose(4999, reason)
+ self.sendClose(4999, unicode(reason))
else:
self.sendClose(4998)
def send(self, message_type, message):
self.sendMessage(message_type + message)
- class WebSocketResource(autobahn.resource.WebSocketResource):
+ class WebSocketResource(autobahn.twisted.resource.WebSocketResource):
def render(self, request):
request.channel.setTimeout(None)
- return autobahn.resource.WebSocketResource.render(self, request)
+ return autobahn.twisted.resource.WebSocketResource.render(self, request)
def WebSocketEngine(path=None):
- parsed = urlparse.urlparse(config.BASE_URL)
- port = parsed.port
- if port is None:
- if parsed.scheme == "http":
- port = 80
- elif parsed.scheme == "https":
- port = 443
- else:
- raise Exception("Unable to determine port from BASE_URL: " + config.BASE_URL)
-
- factory = autobahn.websocket.WebSocketServerFactory("ws://localhost:%d" % port)
+ factory = autobahn.twisted.websocket.WebSocketServerFactory("ws://localhost")
+ factory.externalPort = None
factory.protocol = WebSocketEngineProtocol
factory.setProtocolOptions(maxMessagePayloadSize=512, maxFramePayloadSize=512, tcpNoDelay=False)
resource = WebSocketResource(factory)
return resource
+