from twisted.names import client
from twisted.internet import reactor, error
from authgateengine import login_optional, getSessionData
-import simplejson, md5, sys, os, time, config, qwebirc.config_options as config_options, traceback, socket
+import md5, sys, os, time, config, qwebirc.config_options as config_options, traceback, socket
import qwebirc.ircclient as ircclient
from adminengine import AdminEngineAction
from qwebirc.util import HitCounter
import qwebirc.dns as qdns
+import qwebirc.util.qjson as json
+import urlparse
+
+TRANSPORTS = ["longpoll"]
+
+try:
+ import autobahn.websocket
+ import autobahn.resource
+ has_websocket = True
+ TRANSPORTS.append("websocket")
+except ImportError:
+ has_websocket = False
+
+BAD_SESSION_MESSAGE = "Invalid session, this most likely means the server has restarted; close this dialog and then try refreshing the page."
Sessions = {}
def get_session_id():
class IDGenerationException(Exception):
pass
-class PassthruException(Exception):
+class LineTooLongException(Exception):
pass
-
-NOT_DONE_YET = None
-def jsondump(fn):
- def decorator(*args, **kwargs):
- try:
- x = fn(*args, **kwargs)
- if x is None:
- return server.NOT_DONE_YET
- x = (True, x)
- except AJAXException, e:
- x = (False, e[0])
- except PassthruException, e:
- return str(e)
-
- return simplejson.dumps(x)
- return decorator
+EMPTY_JSON_LIST = json.dumps([])
def cleanupSession(id):
try:
self.closed = False
self.cleanupschedule = None
- def subscribe(self, channel, notifier):
- timeout_entry = reactor.callLater(config.HTTP_AJAX_REQUEST_TIMEOUT, self.timeout, channel)
- def cancel_timeout(result):
- if channel in self.subscriptions:
- self.subscriptions.remove(channel)
- try:
- timeout_entry.cancel()
- except error.AlreadyCalled:
- pass
- notifier.addCallbacks(cancel_timeout, cancel_timeout)
-
+ def subscribe(self, channel):
if len(self.subscriptions) >= config.MAXSUBSCRIPTIONS:
self.subscriptions.pop(0).close()
self.subscriptions.append(channel)
self.flush()
-
+
+ def unsubscribe(self, channel):
+ try:
+ self.subscriptions.remove(channel)
+ except ValueError:
+ pass
+
def timeout(self, channel):
if self.schedule:
return
-
- channel.write(simplejson.dumps([]))
- if channel in self.subscriptions:
- self.subscriptions.remove(channel)
-
+
+ self.unsubscribe(channel)
+ channel.write(EMPTY_JSON_LIST)
+
def flush(self, scheduled=False):
if scheduled:
self.schedule = None
if not self.schedule:
self.schedule = reactor.callLater(0, self.flush, True)
return
-
+
self.throttle = t + config.UPDATE_FREQ
- encdata = simplejson.dumps(self.buffer)
+ encdata = json.dumps(self.buffer)
self.buffer = []
self.buflen = 0
- newsubs = []
- for x in self.subscriptions:
+ subs = self.subscriptions
+ self.subscriptions = newsubs = []
+ for x in subs:
if x.write(encdata):
newsubs.append(x)
- self.subscriptions = newsubs
- if self.closed and not self.subscriptions:
+ if self.closed and not newsubs:
cleanupSession(self.id)
def event(self, data):
self.flush()
def push(self, data):
- if not self.closed:
- self.client.write(data)
+ if self.closed:
+ return
+
+ if len(data) > config.MAXLINELEN:
+ raise LineTooLongException
+
+ self.client.write(data)
def disconnect(self):
# keep the session hanging around for a few seconds so the
def connect_notice(line):
return "c", "NOTICE", "", ("AUTH", "*** (qwebirc) %s" % line)
-class Channel:
+class RequestChannel(object):
def __init__(self, request):
self.request = request
-
-class SingleUseChannel(Channel):
+
def write(self, data):
self.request.write(data)
self.request.finish()
def close(self):
self.request.finish()
-class MultipleUseChannel(Channel):
- def write(self, data):
- self.request.write(data)
- return True
-
class AJAXEngine(resource.Resource):
isLeaf = True
self.__connect_hit = HitCounter()
self.__total_hit = HitCounter()
- @jsondump
def render_POST(self, request):
path = request.path[len(self.prefix):]
if path[0] == "/":
handler = self.COMMANDS.get(path[1:])
if handler is not None:
- return handler(self, request)
-
- raise PassthruException, http_error.NoResource().render(request)
+ try:
+ return handler(self, request)
+ except AJAXException, e:
+ return json.dumps((False, e[0]))
+
+ return "404" ## TODO: tidy up
def newConnection(self, request):
ticket = login_optional(request)
- _, ip, port = request.transport.getPeer()
+ ip = request.getClientIP()
nick = request.args.get("nick")
if not nick:
if password is not None:
password = ircclient.irc_decode(password[0])
- for i in xrange(10):
+ for i in range(10):
id = get_session_id()
if not Sessions.get(id):
break
Sessions[id] = session
- return id
+ return json.dumps((True, id, TRANSPORTS))
def getSession(self, request):
bad_session_message = "Invalid session, this most likely means the server has restarted; close this dialog and then try refreshing the page."
return session
def subscribe(self, request):
- request.channel.cancelTimeout()
- self.getSession(request).subscribe(SingleUseChannel(request), request.notifyFinish())
- return NOT_DONE_YET
+ request.channel.setTimeout(None)
+
+ channel = RequestChannel(request)
+ session = self.getSession(request)
+ notifier = request.notifyFinish()
+ session.subscribe(channel)
+
+ timeout_entry = reactor.callLater(config.HTTP_AJAX_REQUEST_TIMEOUT, session.timeout, channel)
+ def cancel_timeout(result):
+ try:
+ timeout_entry.cancel()
+ except error.AlreadyCalled:
+ pass
+ session.unsubscribe(channel)
+ notifier.addCallbacks(cancel_timeout, cancel_timeout)
+ return server.NOT_DONE_YET
def push(self, request):
command = request.args.get("c")
raise AJAXException, "No command specified."
self.__total_hit()
- decoded = ircclient.irc_decode(command[0])
-
session = self.getSession(request)
-
- if len(decoded) > config.MAXLINELEN:
- session.disconnect()
- raise AJAXException, "Line too long."
-
try:
- session.push(decoded)
+ session.push(ircclient.irc_decode(command[0]))
except AttributeError: # occurs when we haven't noticed an error
session.disconnect()
raise AJAXException, "Connection closed by server; try reconnecting by reloading the page."
traceback.print_exc(file=sys.stderr)
raise AJAXException, "Unknown error."
- return True
+ return json.dumps((True, True))
def closeById(self, k):
s = Sessions.get(k)
COMMANDS = dict(p=push, n=newConnection, s=subscribe)
+if has_websocket:
+ class WebSocketChannel(object):
+ def __init__(self, channel):
+ self.channel = channel
+
+ def write(self, data):
+ self.channel.send("c", data)
+ return True
+
+ def close(self):
+ self.channel.close()
+
+ class WebSocketEngineProtocol(autobahn.websocket.WebSocketServerProtocol):
+ AWAITING_AUTH, AUTHED = 0, 1
+
+ def __init__(self, *args, **kwargs):
+ self.__state = self.AWAITING_AUTH
+ self.__session = None
+ self.__channel = None
+ self.__timeout = None
+
+ def onOpen(self):
+ self.__timeout = reactor.callLater(5, self.close, "Authentication timeout")
+
+ def onClose(self, wasClean, code, reason):
+ self.__cancelTimeout()
+ if self.__session:
+ self.__session.unsubscribe(self.__channel)
+ self.__session = None
+
+ def onMessage(self, msg, binary):
+ # we don't bother checking the Origin header, as if you can auth then you've been able to pass the browser's
+ # normal origin handling (POSTed the new connection request and managed to get the session id)
+ state = self.__state
+ message_type, message = msg[:1], msg[1:]
+ if state == self.AWAITING_AUTH:
+ if message_type == "s": # subscribe
+ session = Sessions.get(message)
+ if not session:
+ self.close(BAD_SESSION_MESSAGE)
+ return
+
+ self.__cancelTimeout()
+ self.__session = session
+ self.send("s", "True")
+ self.__state = self.AUTHED
+ self.__channel = WebSocketChannel(self)
+ session.subscribe(self.__channel)
+ return
+ elif state == self.AUTHED:
+ if message_type == "p": # push
+ self.__session.push(ircclient.irc_decode(message))
+ return
+
+ self.close("Bad message type")
+
+ def __cancelTimeout(self):
+ if self.__timeout is not None:
+ try:
+ self.__timeout.cancel()
+ except error.AlreadyCalled:
+ pass
+ self.__timeout = None
+
+ def close(self, reason=None):
+ self.__cancelTimeout()
+ if reason:
+ self.sendClose(4999, reason)
+ else:
+ self.sendClose(4998)
+
+ if self.__session:
+ self.__session.unsubscribe(self.__channel)
+ self.__session = None
+
+ def send(self, message_type, message):
+ self.sendMessage(message_type + message)
+
+ class WebSocketResource(autobahn.resource.WebSocketResource):
+ def render(self, request):
+ request.channel.setTimeout(None)
+ return autobahn.resource.WebSocketResource.render(self, request)
+
+ def WebSocketEngine(path=None):
+ parsed = urlparse.urlparse(config.BASE_URL)
+ port = parsed.port
+ if port is None:
+ if parsed.scheme == "http":
+ port = 80
+ elif parsed.scheme == "https":
+ port = 443
+ else:
+ raise Exception("Unable to determine port from BASE_URL: " + config.BASE_URL)
+
+ factory = autobahn.websocket.WebSocketServerFactory("ws://localhost:%d" % port)
+ factory.protocol = WebSocketEngineProtocol
+ factory.setProtocolOptions(maxMessagePayloadSize=512, maxFramePayloadSize=512, tcpNoDelay=False)
+ resource = WebSocketResource(factory)
+ return resource