1 from twisted
.web
import resource
, server
, static
, error
as http_error
2 from twisted
.names
import client
3 from twisted
.internet
import reactor
, error
4 from authgateengine
import login_optional
, getSessionData
5 import md5
, sys
, os
, time
, config
, qwebirc
.config_options
as config_options
, traceback
, socket
6 import qwebirc
.ircclient
as ircclient
7 from adminengine
import AdminEngineAction
8 from qwebirc
.util
import HitCounter
9 import qwebirc
.dns
as qdns
10 import qwebirc
.util
.qjson
as json
13 TRANSPORTS
= ["longpoll"]
16 import autobahn
.websocket
17 import autobahn
.resource
19 TRANSPORTS
.append("websocket")
23 BAD_SESSION_MESSAGE
= "Invalid session, this most likely means the server has restarted; close this dialog and then try refreshing the page."
24 MAX_SEQNO
= 9223372036854775807 # 2**63 - 1... yeah it doesn't wrap
28 return md5
.md5(os
.urandom(16)).hexdigest()
30 class BufferOverflowException(Exception):
33 class AJAXException(Exception):
36 class IDGenerationException(Exception):
39 class LineTooLongException(Exception):
42 EMPTY_JSON_LIST
= json
.dumps([])
44 def cleanupSession(id):
51 def __init__(self
, id):
53 self
.subscriptions
= []
55 self
.old_buffer
= None
60 self
.cleanupschedule
= None
64 def subscribe(self
, channel
, seqNo
=None):
65 if len(self
.subscriptions
) >= config
.MAXSUBSCRIPTIONS
:
66 self
.subscriptions
.pop(0).close()
68 if seqNo
< self
.subSeqNo
:
69 if self
.old_buffer
is None or seqNo
!= self
.old_buffer
[0]:
70 channel
.write(json
.dumps([False, "Unable to reconnect -- sequence number too old."]), seqNo
+ 1)
73 if not channel
.write(self
.old_buffer
[1], self
.old_buffer
[0] + 1):
76 self
.subscriptions
.append(channel
)
79 def unsubscribe(self
, channel
):
81 self
.subscriptions
.remove(channel
)
85 def timeout(self
, channel
):
89 self
.unsubscribe(channel
)
90 channel
.write(EMPTY_JSON_LIST
)
92 def flush(self
, scheduled
=False):
96 if not self
.buffer or not self
.subscriptions
:
101 if t
< self
.throttle
:
102 if not self
.schedule
:
103 self
.schedule
= reactor
.callLater(self
.throttle
- t
, self
.flush
, True)
106 # process the rest of the packet
108 if not self
.schedule
:
109 self
.schedule
= reactor
.callLater(0, self
.flush
, True)
112 self
.throttle
= t
+ config
.UPDATE_FREQ
114 encdata
= json
.dumps(self
.buffer)
115 self
.old_buffer
= (self
.subSeqNo
, encdata
)
120 subs
= self
.subscriptions
121 self
.subscriptions
= newsubs
= []
124 if x
.write(encdata
, self
.subSeqNo
):
127 if self
.closed
and not newsubs
:
128 cleanupSession(self
.id)
130 def event(self
, data
):
131 newbuflen
= self
.buflen
+ len(data
)
132 if newbuflen
> config
.MAXBUFLEN
:
134 self
.client
.error("Buffer overflow.")
137 self
.buffer.append(data
)
138 self
.buflen
= newbuflen
141 def push(self
, data
, seq_no
=None):
145 if len(data
) > config
.MAXLINELEN
:
146 raise LineTooLongException
148 if seq_no
is not None:
149 if seq_no
<= self
.pubSeqNo
:
151 self
.pubSeqNo
= seq_no
152 self
.client
.write(data
)
154 def disconnect(self
):
155 # keep the session hanging around for a few seconds so the
156 # client has a chance to see what the issue was
159 reactor
.callLater(5, cleanupSession
, self
.id)
161 # DANGER! Breach of encapsulation!
162 def connect_notice(line
):
163 return "c", "NOTICE", "", ("AUTH", "*** (qwebirc) %s" % line
)
165 class RequestChannel(object):
166 def __init__(self
, request
):
167 self
.request
= request
169 def write(self
, data
, seqNo
):
170 self
.request
.setHeader("n", str(seqNo
))
171 self
.request
.write(data
)
172 self
.request
.finish()
176 self
.request
.finish()
178 class AJAXEngine(resource
.Resource
):
181 def __init__(self
, prefix
):
183 self
.__connect
_hit
= HitCounter()
184 self
.__total
_hit
= HitCounter()
186 def render_POST(self
, request
):
187 path
= request
.path
[len(self
.prefix
):]
189 handler
= self
.COMMANDS
.get(path
[1:])
190 if handler
is not None:
192 return handler(self
, request
)
193 except AJAXException
, e
:
194 return json
.dumps((False, e
[0]))
196 return "404" ## TODO: tidy up
198 def newConnection(self
, request
):
199 ticket
= login_optional(request
)
201 ip
= request
.getClientIP()
203 nick
= request
.args
.get("nick")
205 raise AJAXException
, "Nickname not supplied."
206 nick
= ircclient
.irc_decode(nick
[0])
208 password
= request
.args
.get("password")
209 if password
is not None:
210 password
= ircclient
.irc_decode(password
[0])
213 id = get_session_id()
214 if not Sessions
.get(id):
217 raise IDGenerationException()
219 session
= IRCSession(id)
221 qticket
= getSessionData(request
).get("qticket")
225 service_mask
= config
.AUTH_SERVICE
226 msg_mask
= service_mask
.split("!")[0] + "@" + service_mask
.split("@", 1)[1]
227 perform
= ["PRIVMSG %s :TICKETAUTH %s" % (msg_mask
, qticket
)]
229 ident
, realname
= config
.IDENT
, config
.REALNAME
230 if ident
is config_options
.IDENT_HEX
or ident
is None: # latter is legacy
231 ident
= socket
.inet_aton(ip
).encode("hex")
232 elif ident
is config_options
.IDENT_NICKNAME
:
237 def proceed(hostname
):
238 kwargs
= dict(nick
=nick
, ident
=ident
, ip
=ip
, realname
=realname
, perform
=perform
, hostname
=hostname
)
239 if password
is not None:
240 kwargs
["password"] = password
242 client
= ircclient
.createIRC(session
, **kwargs
)
243 session
.client
= client
245 if not hasattr(config
, "WEBIRC_MODE") or config
.WEBIRC_MODE
== "hmac":
247 elif config
.WEBIRC_MODE
!= "hmac":
248 notice
= lambda x
: session
.event(connect_notice(x
))
249 notice("Looking up your hostname...")
250 def callback(hostname
):
251 notice("Found your hostname.")
253 def errback(failure
):
254 notice("Couldn't look up your hostname!")
256 qdns
.lookupAndVerifyPTR(ip
, timeout
=[config
.DNS_TIMEOUT
]).addCallbacks(callback
, errback
)
258 Sessions
[id] = session
260 return json
.dumps((True, id, TRANSPORTS
))
262 def getSession(self
, request
):
263 bad_session_message
= "Invalid session, this most likely means the server has restarted; close this dialog and then try refreshing the page."
265 sessionid
= request
.args
.get("s")
266 if sessionid
is None:
267 raise AJAXException
, bad_session_message
269 session
= Sessions
.get(sessionid
[0])
271 raise AJAXException
, bad_session_message
274 def subscribe(self
, request
):
275 request
.channel
.setTimeout(None)
277 channel
= RequestChannel(request
)
278 session
= self
.getSession(request
)
279 notifier
= request
.notifyFinish()
281 seq_no
= request
.args
.get("n")
283 if seq_no
is not None:
284 seq_no
= int(seq_no
[0])
285 if seq_no
< 0 or seq_no
> MAX_SEQNO
:
288 raise AJAXEngine
, "Bad sequence number"
290 session
.subscribe(channel
, seq_no
)
292 timeout_entry
= reactor
.callLater(config
.HTTP_AJAX_REQUEST_TIMEOUT
, session
.timeout
, channel
)
293 def cancel_timeout(result
):
295 timeout_entry
.cancel()
296 except error
.AlreadyCalled
:
298 session
.unsubscribe(channel
)
299 notifier
.addCallbacks(cancel_timeout
, cancel_timeout
)
300 return server
.NOT_DONE_YET
302 def push(self
, request
):
303 command
= request
.args
.get("c")
305 raise AJAXException
, "No command specified."
308 seq_no
= request
.args
.get("n")
310 if seq_no
is not None:
311 seq_no
= int(seq_no
[0])
312 if seq_no
< 0 or seq_no
> MAX_SEQNO
:
315 raise AJAXEngine("Bad sequence number %r" % seq_no
)
317 session
= self
.getSession(request
)
319 session
.push(ircclient
.irc_decode(command
[0]), seq_no
)
320 except AttributeError: # occurs when we haven't noticed an error
322 raise AJAXException
, "Connection closed by server; try reconnecting by reloading the page."
323 except Exception, e
: # catch all
325 traceback
.print_exc(file=sys
.stderr
)
326 raise AJAXException
, "Unknown error."
328 return json
.dumps((True, True))
330 def closeById(self
, k
):
334 s
.client
.client
.error("Closed by admin interface")
337 def adminEngine(self
):
339 "Sessions": [(str(v
.client
.client
), AdminEngineAction("close", self
.closeById
, k
)) for k
, v
in Sessions
.iteritems() if not v
.closed
],
340 "Connections": [(self
.__connect
_hit
,)],
341 "Total hits": [(self
.__total
_hit
,)],
344 COMMANDS
= dict(p
=push
, n
=newConnection
, s
=subscribe
)
347 class WebSocketChannel(object):
348 def __init__(self
, channel
):
349 self
.channel
= channel
351 def write(self
, data
, seqNo
):
352 self
.channel
.send("c", "%d,%s" % (seqNo
, data
))
358 class WebSocketEngineProtocol(autobahn
.websocket
.WebSocketServerProtocol
):
359 AWAITING_AUTH
, AUTHED
= 0, 1
361 def __init__(self
, *args
, **kwargs
):
362 self
.__state
= self
.AWAITING_AUTH
363 self
.__session
= None
364 self
.__channel
= None
365 self
.__timeout
= None
368 self
.__timeout
= reactor
.callLater(5, self
.close
, "Authentication timeout")
370 def onClose(self
, wasClean
, code
, reason
):
371 self
.__cancelTimeout
()
373 self
.__session
.unsubscribe(self
.__channel
)
374 self
.__session
= None
376 def onMessage(self
, msg
, binary
):
377 # we don't bother checking the Origin header, as if you can auth then you've been able to pass the browser's
378 # normal origin handling (POSTed the new connection request and managed to get the session id)
380 message_type
, message
= msg
[:1], msg
[1:]
381 if state
== self
.AWAITING_AUTH
:
382 if message_type
== "s": # subscribe
383 tokens
= message
.split(",", 2)
385 self
.close("Bad tokens")
388 seq_no
, message
= tokens
[0], tokens
[1]
391 if seq_no
< 0 or seq_no
> MAX_SEQNO
:
394 self
.close("Bad value")
396 session
= Sessions
.get(message
)
398 self
.close(BAD_SESSION_MESSAGE
)
401 self
.__cancelTimeout
()
402 self
.__session
= session
403 self
.send("s", "True")
404 self
.__state
= self
.AUTHED
405 self
.__channel
= WebSocketChannel(self
)
406 session
.subscribe(self
.__channel
, seq_no
)
408 elif state
== self
.AUTHED
:
409 if message_type
== "p": # push
410 tokens
= message
.split(",", 2)
412 self
.close("Bad tokens")
415 seq_no
, message
= tokens
[0], tokens
[1]
418 if seq_no
< 0 or seq_no
> MAX_SEQNO
:
421 self
.close("Bad value")
422 self
.__session
.push(ircclient
.irc_decode(message
))
425 self
.close("Bad message type")
427 def __cancelTimeout(self
):
428 if self
.__timeout
is not None:
430 self
.__timeout
.cancel()
431 except error
.AlreadyCalled
:
433 self
.__timeout
= None
435 def close(self
, reason
=None):
436 self
.__cancelTimeout
()
438 self
.sendClose(4999, reason
)
443 self
.__session
.unsubscribe(self
.__channel
)
444 self
.__session
= None
446 def send(self
, message_type
, message
):
447 self
.sendMessage(message_type
+ message
)
449 class WebSocketResource(autobahn
.resource
.WebSocketResource
):
450 def render(self
, request
):
451 request
.channel
.setTimeout(None)
452 return autobahn
.resource
.WebSocketResource
.render(self
, request
)
454 def WebSocketEngine(path
=None):
455 parsed
= urlparse
.urlparse(config
.BASE_URL
)
458 if parsed
.scheme
== "http":
460 elif parsed
.scheme
== "https":
463 raise Exception("Unable to determine port from BASE_URL: " + config
.BASE_URL
)
465 factory
= autobahn
.websocket
.WebSocketServerFactory("ws://localhost:%d" % port
)
466 factory
.protocol
= WebSocketEngineProtocol
467 factory
.setProtocolOptions(maxMessagePayloadSize
=512, maxFramePayloadSize
=512, tcpNoDelay
=False)
468 resource
= WebSocketResource(factory
)