1 from twisted
.web
import resource
, server
, static
, error
as http_error
2 from twisted
.names
import client
3 from twisted
.internet
import reactor
, error
4 from authgateengine
import login_optional
, getSessionData
5 import md5
, sys
, os
, time
, config
, qwebirc
.config_options
as config_options
, traceback
, socket
6 import qwebirc
.ircclient
as ircclient
7 from adminengine
import AdminEngineAction
8 from qwebirc
.util
import HitCounter
9 import qwebirc
.dns
as qdns
10 import qwebirc
.util
.qjson
as json
13 TRANSPORTS
= ["longpoll"]
16 import autobahn
.websocket
17 import autobahn
.resource
19 TRANSPORTS
.append("websocket")
23 BAD_SESSION_MESSAGE
= "Invalid session, this most likely means the server has restarted; close this dialog and then try refreshing the page."
27 return md5
.md5(os
.urandom(16)).hexdigest()
29 class BufferOverflowException(Exception):
32 class AJAXException(Exception):
35 class IDGenerationException(Exception):
38 class LineTooLongException(Exception):
41 EMPTY_JSON_LIST
= json
.dumps([])
43 def cleanupSession(id):
50 def __init__(self
, id):
52 self
.subscriptions
= []
58 self
.cleanupschedule
= None
60 def subscribe(self
, channel
):
61 if len(self
.subscriptions
) >= config
.MAXSUBSCRIPTIONS
:
62 self
.subscriptions
.pop(0).close()
64 self
.subscriptions
.append(channel
)
67 def unsubscribe(self
, channel
):
69 self
.subscriptions
.remove(channel
)
73 def timeout(self
, channel
):
77 channel
.write(EMPTY_JSON_LIST
)
78 if channel
in self
.subscriptions
:
79 self
.subscriptions
.remove(channel
)
81 def flush(self
, scheduled
=False):
85 if not self
.buffer or not self
.subscriptions
:
92 self
.schedule
= reactor
.callLater(self
.throttle
- t
, self
.flush
, True)
95 # process the rest of the packet
98 self
.schedule
= reactor
.callLater(0, self
.flush
, True)
101 self
.throttle
= t
+ config
.UPDATE_FREQ
103 encdata
= json
.dumps(self
.buffer)
108 for x
in self
.subscriptions
:
112 self
.subscriptions
= newsubs
113 if self
.closed
and not self
.subscriptions
:
114 cleanupSession(self
.id)
116 def event(self
, data
):
117 newbuflen
= self
.buflen
+ len(data
)
118 if newbuflen
> config
.MAXBUFLEN
:
120 self
.client
.error("Buffer overflow.")
123 self
.buffer.append(data
)
124 self
.buflen
= newbuflen
127 def push(self
, data
):
131 if len(data
) > config
.MAXLINELEN
:
132 raise LineTooLongException
134 self
.client
.write(data
)
136 def disconnect(self
):
137 # keep the session hanging around for a few seconds so the
138 # client has a chance to see what the issue was
141 reactor
.callLater(5, cleanupSession
, self
.id)
143 # DANGER! Breach of encapsulation!
144 def connect_notice(line
):
145 return "c", "NOTICE", "", ("AUTH", "*** (qwebirc) %s" % line
)
147 class RequestChannel(object):
148 def __init__(self
, request
):
149 self
.request
= request
151 def write(self
, data
):
152 self
.request
.write(data
)
153 self
.request
.finish()
157 self
.request
.finish()
159 class AJAXEngine(resource
.Resource
):
162 def __init__(self
, prefix
):
164 self
.__connect
_hit
= HitCounter()
165 self
.__total
_hit
= HitCounter()
167 def render_POST(self
, request
):
168 path
= request
.path
[len(self
.prefix
):]
170 handler
= self
.COMMANDS
.get(path
[1:])
171 if handler
is not None:
173 return handler(self
, request
)
174 except AJAXException
, e
:
175 return json
.dumps((False, e
[0]))
177 return "404" ## TODO: tidy up
179 def newConnection(self
, request
):
180 ticket
= login_optional(request
)
182 ip
= request
.getClientIP()
184 nick
= request
.args
.get("nick")
186 raise AJAXException
, "Nickname not supplied."
187 nick
= ircclient
.irc_decode(nick
[0])
189 password
= request
.args
.get("password")
190 if password
is not None:
191 password
= ircclient
.irc_decode(password
[0])
194 id = get_session_id()
195 if not Sessions
.get(id):
198 raise IDGenerationException()
200 session
= IRCSession(id)
202 qticket
= getSessionData(request
).get("qticket")
206 service_mask
= config
.AUTH_SERVICE
207 msg_mask
= service_mask
.split("!")[0] + "@" + service_mask
.split("@", 1)[1]
208 perform
= ["PRIVMSG %s :TICKETAUTH %s" % (msg_mask
, qticket
)]
210 ident
, realname
= config
.IDENT
, config
.REALNAME
211 if ident
is config_options
.IDENT_HEX
or ident
is None: # latter is legacy
212 ident
= socket
.inet_aton(ip
).encode("hex")
213 elif ident
is config_options
.IDENT_NICKNAME
:
218 def proceed(hostname
):
219 kwargs
= dict(nick
=nick
, ident
=ident
, ip
=ip
, realname
=realname
, perform
=perform
, hostname
=hostname
)
220 if password
is not None:
221 kwargs
["password"] = password
223 client
= ircclient
.createIRC(session
, **kwargs
)
224 session
.client
= client
226 if not hasattr(config
, "WEBIRC_MODE") or config
.WEBIRC_MODE
== "hmac":
228 elif config
.WEBIRC_MODE
!= "hmac":
229 notice
= lambda x
: session
.event(connect_notice(x
))
230 notice("Looking up your hostname...")
231 def callback(hostname
):
232 notice("Found your hostname.")
234 def errback(failure
):
235 notice("Couldn't look up your hostname!")
237 qdns
.lookupAndVerifyPTR(ip
, timeout
=[config
.DNS_TIMEOUT
]).addCallbacks(callback
, errback
)
239 Sessions
[id] = session
241 return json
.dumps((True, id, TRANSPORTS
))
243 def getSession(self
, request
):
244 bad_session_message
= "Invalid session, this most likely means the server has restarted; close this dialog and then try refreshing the page."
246 sessionid
= request
.args
.get("s")
247 if sessionid
is None:
248 raise AJAXException
, bad_session_message
250 session
= Sessions
.get(sessionid
[0])
252 raise AJAXException
, bad_session_message
255 def subscribe(self
, request
):
256 request
.channel
.cancelTimeout()
258 channel
= RequestChannel(request
)
259 session
= self
.getSession(request
)
260 notifier
= request
.notifyFinish()
261 session
.subscribe(channel
)
263 timeout_entry
= reactor
.callLater(config
.HTTP_AJAX_REQUEST_TIMEOUT
, session
.timeout
, channel
)
264 def cancel_timeout(result
):
265 session
.unsubscribe(self
)
267 timeout_entry
.cancel()
268 except error
.AlreadyCalled
:
270 notifier
.addCallbacks(cancel_timeout
, cancel_timeout
)
271 return server
.NOT_DONE_YET
273 def push(self
, request
):
274 command
= request
.args
.get("c")
276 raise AJAXException
, "No command specified."
279 session
= self
.getSession(request
)
281 session
.push(ircclient
.irc_decode(command
[0]))
282 except AttributeError: # occurs when we haven't noticed an error
284 raise AJAXException
, "Connection closed by server; try reconnecting by reloading the page."
285 except Exception, e
: # catch all
287 traceback
.print_exc(file=sys
.stderr
)
288 raise AJAXException
, "Unknown error."
290 return json
.dumps((True, True))
292 def closeById(self
, k
):
296 s
.client
.client
.error("Closed by admin interface")
299 def adminEngine(self
):
301 "Sessions": [(str(v
.client
.client
), AdminEngineAction("close", self
.closeById
, k
)) for k
, v
in Sessions
.iteritems() if not v
.closed
],
302 "Connections": [(self
.__connect
_hit
,)],
303 "Total hits": [(self
.__total
_hit
,)],
306 COMMANDS
= dict(p
=push
, n
=newConnection
, s
=subscribe
)
309 class WebSocketChannel(object):
310 def __init__(self
, channel
):
311 self
.channel
= channel
313 def write(self
, data
):
314 self
.channel
.send("c", data
)
320 class WebSocketEngineProtocol(autobahn
.websocket
.WebSocketServerProtocol
):
321 AWAITING_AUTH
, AUTHED
= 0, 1
323 def __init__(self
, *args
, **kwargs
):
324 self
.__state
= self
.AWAITING_AUTH
325 self
.__session
= None
326 self
.__channel
= None
327 self
.__timeout
= None
330 self
.__timeout
= reactor
.callLater(5, self
.close
, "Authentication timeout")
332 def onClose(self
, wasClean
, code
, reason
):
333 self
.__cancelTimeout
()
335 self
.__session
.unsubscribe(self
.__channel
)
336 self
.__session
= None
338 def onMessage(self
, msg
, binary
):
339 # we don't bother checking the Origin header, as if you can auth then you've been able to pass the browser's
340 # normal origin handling (POSTed the new connection request and managed to get the session id)
342 message_type
, message
= msg
[:1], msg
[1:]
343 if state
== self
.AWAITING_AUTH
:
344 if message_type
== "s": # subscribe
345 session
= Sessions
.get(message
)
347 self
.close(BAD_SESSION_MESSAGE
)
350 self
.__cancelTimeout
()
351 self
.__session
= session
352 self
.send("s", "True")
353 self
.__state
= self
.AUTHED
354 self
.__channel
= WebSocketChannel(self
)
355 session
.subscribe(self
.__channel
)
357 elif state
== self
.AUTHED
:
358 if message_type
== "p": # push
359 self
.__session
.push(ircclient
.irc_decode(message
))
362 self
.close("Bad message type")
364 def __cancelTimeout(self
):
365 if self
.__timeout
is not None:
366 self
.__timeout
.cancel()
367 self
.__timeout
= None
369 def close(self
, reason
=None):
370 self
.__cancelTimeout
()
372 self
.sendClose(4999, reason
)
377 self
.__session
.unsubscribe(self
.__channel
)
378 self
.__session
= None
380 def send(self
, message_type
, message
):
381 self
.sendMessage(message_type
+ message
)
383 class WebSocketResource(autobahn
.resource
.WebSocketResource
):
384 def render(self
, request
):
385 request
.channel
.cancelTimeout()
386 return autobahn
.resource
.WebSocketResource
.render(self
, request
)
388 def WebSocketEngine(path
=None):
389 parsed
= urlparse
.urlparse(config
.BASE_URL
)
392 if parsed
.scheme
== "http":
394 elif parsed
.scheme
== "https":
397 raise Exception("Unable to determine port from BASE_URL: " + config
.BASE_URL
)
399 factory
= autobahn
.websocket
.WebSocketServerFactory("ws://localhost:%d" % port
)
400 factory
.protocol
= WebSocketEngineProtocol
401 factory
.setProtocolOptions(maxMessagePayloadSize
=512, maxFramePayloadSize
=512, tcpNoDelay
=False)
402 resource
= WebSocketResource(factory
)