]> jfr.im git - irc/quakenet/qwebirc.git/blobdiff - qwebirc/engines/ajaxengine.py
experimental websocket support
[irc/quakenet/qwebirc.git] / qwebirc / engines / ajaxengine.py
index 5fab9cf4c4f00c95d4cce6869be941e4211c141f..11566c5fd8ed275b36a0d4cfdf924f112637383b 100644 (file)
@@ -2,11 +2,25 @@ from twisted.web import resource, server, static, error as http_error
 from twisted.names import client
 from twisted.internet import reactor, error
 from authgateengine import login_optional, getSessionData
-import simplejson, md5, sys, os, time, config, weakref, traceback
+import md5, sys, os, time, config, qwebirc.config_options as config_options, traceback, socket
 import qwebirc.ircclient as ircclient
 from adminengine import AdminEngineAction
 from qwebirc.util import HitCounter
+import qwebirc.dns as qdns
+import qwebirc.util.qjson as json
+import urlparse
 
+TRANSPORTS = ["longpoll"]
+
+try:
+  import autobahn.websocket
+  import autobahn.resource
+  has_websocket = True
+  TRANSPORTS.append("websocket")
+except ImportError:
+  has_websocket = False
+
+BAD_SESSION_MESSAGE = "Invalid session, this most likely means the server has restarted; close this dialog and then try refreshing the page."
 Sessions = {}
 
 def get_session_id():
@@ -21,25 +35,10 @@ class AJAXException(Exception):
 class IDGenerationException(Exception):
   pass
 
-class PassthruException(Exception):
+class LineTooLongException(Exception):
   pass
-  
-NOT_DONE_YET = None
 
-def jsondump(fn):
-  def decorator(*args, **kwargs):
-    try:
-      x = fn(*args, **kwargs)
-      if x is None:
-        return server.NOT_DONE_YET
-      x = (True, x)
-    except AJAXException, e:
-      x = (False, e[0])
-    except PassthruException, e:
-      return str(e)
-      
-    return simplejson.dumps(x)
-  return decorator
+EMPTY_JSON_LIST = json.dumps([])
 
 def cleanupSession(id):
   try:
@@ -52,33 +51,30 @@ class IRCSession:
     self.id = id
     self.subscriptions = []
     self.buffer = []
+    self.buflen = 0
     self.throttle = 0
     self.schedule = None
     self.closed = False
     self.cleanupschedule = None
 
-  def subscribe(self, channel, notifier):
-    timeout_entry = reactor.callLater(config.HTTP_AJAX_REQUEST_TIMEOUT, self.timeout, channel)
-    def cancel_timeout(result):
-      if channel in self.subscriptions:
-        self.subscriptions.remove(channel)
-      try:
-        timeout_entry.cancel()
-      except error.AlreadyCalled:
-        pass
-    notifier.addCallbacks(cancel_timeout, cancel_timeout)
-    
+  def subscribe(self, channel):
     if len(self.subscriptions) >= config.MAXSUBSCRIPTIONS:
       self.subscriptions.pop(0).close()
 
     self.subscriptions.append(channel)
     self.flush()
-      
+
+  def unsubscribe(self, channel):
+    try:
+      self.subscriptions.remove(channel)
+    except ValueError:
+      pass
+
   def timeout(self, channel):
     if self.schedule:
       return
       
-    channel.write(simplejson.dumps([]))
+    channel.write(EMPTY_JSON_LIST)
     if channel in self.subscriptions:
       self.subscriptions.remove(channel)
       
@@ -104,9 +100,10 @@ class IRCSession:
         
     self.throttle = t + config.UPDATE_FREQ
 
-    encdata = simplejson.dumps(self.buffer)
+    encdata = json.dumps(self.buffer)
     self.buffer = []
-    
+    self.buflen = 0
+
     newsubs = []
     for x in self.subscriptions:
       if x.write(encdata):
@@ -117,18 +114,24 @@ class IRCSession:
       cleanupSession(self.id)
 
   def event(self, data):
-    bufferlen = sum(map(len, self.buffer))
-    if bufferlen + len(data) > config.MAXBUFLEN:
+    newbuflen = self.buflen + len(data)
+    if newbuflen > config.MAXBUFLEN:
       self.buffer = []
       self.client.error("Buffer overflow.")
       return
 
     self.buffer.append(data)
+    self.buflen = newbuflen
     self.flush()
     
   def push(self, data):
-    if not self.closed:
-      self.client.write(data)
+    if self.closed:
+      return
+
+    if len(data) > config.MAXLINELEN:
+      raise LineTooLongException
+
+    self.client.write(data)
 
   def disconnect(self):
     # keep the session hanging around for a few seconds so the
@@ -137,11 +140,14 @@ class IRCSession:
 
     reactor.callLater(5, cleanupSession, self.id)
 
-class Channel:
+# DANGER! Breach of encapsulation!
+def connect_notice(line):
+  return "c", "NOTICE", "", ("AUTH", "*** (qwebirc) %s" % line)
+
+class RequestChannel(object):
   def __init__(self, request):
     self.request = request
-  
-class SingleUseChannel(Channel):
+
   def write(self, data):
     self.request.write(data)
     self.request.finish()
@@ -150,11 +156,6 @@ class SingleUseChannel(Channel):
   def close(self):
     self.request.finish()
     
-class MultipleUseChannel(Channel):
-  def write(self, data):
-    self.request.write(data)
-    return True
-
 class AJAXEngine(resource.Resource):
   isLeaf = True
   
@@ -163,32 +164,33 @@ class AJAXEngine(resource.Resource):
     self.__connect_hit = HitCounter()
     self.__total_hit = HitCounter()
     
-  @jsondump
   def render_POST(self, request):
     path = request.path[len(self.prefix):]
     if path[0] == "/":
       handler = self.COMMANDS.get(path[1:])
       if handler is not None:
-        return handler(self, request)
-        
-    raise PassthruException, http_error.NoResource().render(request)
+        try:
+          return handler(self, request)
+        except AJAXException, e:
+          return json.dumps((False, e[0]))
+
+    return "404" ## TODO: tidy up
 
-  #def render_GET(self, request):
-    #return self.render_POST(request)
-  
   def newConnection(self, request):
     ticket = login_optional(request)
     
-    _, ip, port = request.transport.getPeer()
+    ip = request.getClientIP()
 
     nick = request.args.get("nick")
     if not nick:
       raise AJAXException, "Nickname not supplied."
     nick = ircclient.irc_decode(nick[0])
 
-    ident, realname = "webchat", config.REALNAME
-    
-    for i in xrange(10):
+    password = request.args.get("password")
+    if password is not None:
+      password = ircclient.irc_decode(password[0])
+      
+    for i in range(10):
       id = get_session_id()
       if not Sessions.get(id):
         break
@@ -205,13 +207,38 @@ class AJAXEngine(resource.Resource):
       msg_mask = service_mask.split("!")[0] + "@" + service_mask.split("@", 1)[1]
       perform = ["PRIVMSG %s :TICKETAUTH %s" % (msg_mask, qticket)]
 
+    ident, realname = config.IDENT, config.REALNAME
+    if ident is config_options.IDENT_HEX or ident is None: # latter is legacy
+      ident = socket.inet_aton(ip).encode("hex")
+    elif ident is config_options.IDENT_NICKNAME:
+      ident = nick
+
     self.__connect_hit()
-    client = ircclient.createIRC(session, nick=nick, ident=ident, ip=ip, realname=realname, perform=perform)
-    session.client = client
-    
+
+    def proceed(hostname):
+      kwargs = dict(nick=nick, ident=ident, ip=ip, realname=realname, perform=perform, hostname=hostname)
+      if password is not None:
+        kwargs["password"] = password
+        
+      client = ircclient.createIRC(session, **kwargs)
+      session.client = client
+
+    if not hasattr(config, "WEBIRC_MODE") or config.WEBIRC_MODE == "hmac":
+      proceed(None)
+    elif config.WEBIRC_MODE != "hmac":
+      notice = lambda x: session.event(connect_notice(x))
+      notice("Looking up your hostname...")
+      def callback(hostname):
+        notice("Found your hostname.")
+        proceed(hostname)
+      def errback(failure):
+        notice("Couldn't look up your hostname!")
+        proceed(ip)
+      qdns.lookupAndVerifyPTR(ip, timeout=[config.DNS_TIMEOUT]).addCallbacks(callback, errback)
+
     Sessions[id] = session
     
-    return id
+    return json.dumps((True, id, TRANSPORTS))
   
   def getSession(self, request):
     bad_session_message = "Invalid session, this most likely means the server has restarted; close this dialog and then try refreshing the page."
@@ -227,8 +254,21 @@ class AJAXEngine(resource.Resource):
     
   def subscribe(self, request):
     request.channel.cancelTimeout()
-    self.getSession(request).subscribe(SingleUseChannel(request), request.notifyFinish())
-    return NOT_DONE_YET
+
+    channel = RequestChannel(request)
+    session = self.getSession(request)
+    notifier = request.notifyFinish()
+    session.subscribe(channel)
+
+    timeout_entry = reactor.callLater(config.HTTP_AJAX_REQUEST_TIMEOUT, session.timeout, channel)
+    def cancel_timeout(result):
+      session.unsubscribe(self)
+      try:
+        timeout_entry.cancel()
+      except error.AlreadyCalled:
+        pass
+    notifier.addCallbacks(cancel_timeout, cancel_timeout)
+    return server.NOT_DONE_YET
 
   def push(self, request):
     command = request.args.get("c")
@@ -236,16 +276,9 @@ class AJAXEngine(resource.Resource):
       raise AJAXException, "No command specified."
     self.__total_hit()
     
-    decoded = ircclient.irc_decode(command[0])
-    
     session = self.getSession(request)
-
-    if len(decoded) > config.MAXLINELEN:
-      session.disconnect()
-      raise AJAXException, "Line too long."
-
     try:
-      session.push(decoded)
+      session.push(ircclient.irc_decode(command[0]))
     except AttributeError: # occurs when we haven't noticed an error
       session.disconnect()
       raise AJAXException, "Connection closed by server; try reconnecting by reloading the page."
@@ -254,7 +287,7 @@ class AJAXEngine(resource.Resource):
       traceback.print_exc(file=sys.stderr)
       raise AJAXException, "Unknown error."
   
-    return True
+    return json.dumps((True, True))
   
   def closeById(self, k):
     s = Sessions.get(k)
@@ -271,4 +304,100 @@ class AJAXEngine(resource.Resource):
     }
     
   COMMANDS = dict(p=push, n=newConnection, s=subscribe)
-  
\ No newline at end of file
+  
+if has_websocket:
+  class WebSocketChannel(object):
+    def __init__(self, channel):
+      self.channel = channel
+
+    def write(self, data):
+      self.channel.send("c", data)
+      return True
+
+    def close(self):
+      self.channel.close()
+
+  class WebSocketEngineProtocol(autobahn.websocket.WebSocketServerProtocol):
+    AWAITING_AUTH, AUTHED = 0, 1
+
+    def __init__(self, *args, **kwargs):
+      self.__state = self.AWAITING_AUTH
+      self.__session = None
+      self.__channel = None
+      self.__timeout = None
+
+    def onOpen(self):
+      self.__timeout = reactor.callLater(5, self.close, "Authentication timeout")
+
+    def onClose(self, wasClean, code, reason):
+      self.__cancelTimeout()
+      if self.__session:
+        self.__session.unsubscribe(self.__channel)
+        self.__session = None
+
+    def onMessage(self, msg, binary):
+      # we don't bother checking the Origin header, as if you can auth then you've been able to pass the browser's
+      # normal origin handling (POSTed the new connection request and managed to get the session id)
+      state = self.__state
+      message_type, message = msg[:1], msg[1:]
+      if state == self.AWAITING_AUTH:
+        if message_type == "s":  # subscribe
+          session = Sessions.get(message)
+          if not session:
+            self.close(BAD_SESSION_MESSAGE)
+            return
+
+          self.__cancelTimeout()
+          self.__session = session
+          self.send("s", "True")
+          self.__state = self.AUTHED
+          self.__channel = WebSocketChannel(self)
+          session.subscribe(self.__channel)
+          return
+      elif state == self.AUTHED:
+        if message_type == "p":  # push
+          self.__session.push(ircclient.irc_decode(message))
+          return
+
+      self.close("Bad message type")
+
+    def __cancelTimeout(self):
+      if self.__timeout is not None:
+        self.__timeout.cancel()
+        self.__timeout = None
+
+    def close(self, reason=None):
+      self.__cancelTimeout()
+      if reason:
+        self.sendClose(4999, reason)
+      else:
+        self.sendClose(4998)
+
+      if self.__session:
+        self.__session.unsubscribe(self.__channel)
+        self.__session = None
+
+    def send(self, message_type, message):
+      self.sendMessage(message_type + message)
+
+  class WebSocketResource(autobahn.resource.WebSocketResource):
+    def render(self, request):
+      request.channel.cancelTimeout()
+      return autobahn.resource.WebSocketResource.render(self, request)
+
+  def WebSocketEngine(path=None):
+    parsed = urlparse.urlparse(config.BASE_URL)
+    port = parsed.port
+    if port is None:
+      if parsed.scheme == "http":
+        port = 80
+      elif parsed.scheme == "https":
+        port = 443
+      else:
+        raise Exception("Unable to determine port from BASE_URL: " + config.BASE_URL)
+
+    factory = autobahn.websocket.WebSocketServerFactory("ws://localhost:%d" % port)
+    factory.protocol = WebSocketEngineProtocol
+    factory.setProtocolOptions(maxMessagePayloadSize=512, maxFramePayloadSize=512, tcpNoDelay=False)
+    resource = WebSocketResource(factory)
+    return resource