]>
Commit | Line | Data |
---|---|---|
1 | from twisted.web import resource, server, static, error as http_error | |
2 | from twisted.names import client | |
3 | from twisted.internet import reactor, error | |
4 | from authgateengine import login_optional, getSessionData | |
5 | import md5, sys, os, time, config, qwebirc.config_options as config_options, traceback, socket | |
6 | import qwebirc.ircclient as ircclient | |
7 | from adminengine import AdminEngineAction | |
8 | from qwebirc.util import HitCounter | |
9 | import qwebirc.dns as qdns | |
10 | import qwebirc.util.qjson as json | |
11 | import urlparse | |
12 | ||
13 | TRANSPORTS = ["longpoll"] | |
14 | ||
15 | try: | |
16 | import autobahn.websocket | |
17 | import autobahn.resource | |
18 | has_websocket = True | |
19 | TRANSPORTS.append("websocket") | |
20 | except ImportError: | |
21 | has_websocket = False | |
22 | ||
23 | BAD_SESSION_MESSAGE = "Invalid session, this most likely means the server has restarted; close this dialog and then try refreshing the page." | |
24 | MAX_SEQNO = 9223372036854775807 # 2**63 - 1... yeah it doesn't wrap | |
25 | Sessions = {} | |
26 | ||
27 | def get_session_id(): | |
28 | return md5.md5(os.urandom(16)).hexdigest() | |
29 | ||
30 | class BufferOverflowException(Exception): | |
31 | pass | |
32 | ||
33 | class AJAXException(Exception): | |
34 | pass | |
35 | ||
36 | class IDGenerationException(Exception): | |
37 | pass | |
38 | ||
39 | class LineTooLongException(Exception): | |
40 | pass | |
41 | ||
42 | EMPTY_JSON_LIST = json.dumps([]) | |
43 | ||
44 | def cleanupSession(id): | |
45 | try: | |
46 | del Sessions[id] | |
47 | except KeyError: | |
48 | pass | |
49 | ||
50 | class IRCSession: | |
51 | def __init__(self, id): | |
52 | self.id = id | |
53 | self.subscriptions = [] | |
54 | self.buffer = [] | |
55 | self.old_buffer = None | |
56 | self.buflen = 0 | |
57 | self.throttle = 0 | |
58 | self.schedule = None | |
59 | self.closed = False | |
60 | self.cleanupschedule = None | |
61 | self.pubSeqNo = -1 | |
62 | self.subSeqNo = 0 | |
63 | ||
64 | def subscribe(self, channel, seqNo=None): | |
65 | if len(self.subscriptions) >= config.MAXSUBSCRIPTIONS: | |
66 | self.subscriptions.pop(0).close() | |
67 | ||
68 | if seqNo is not None and seqNo < self.subSeqNo: | |
69 | if self.old_buffer is None or seqNo != self.old_buffer[0]: | |
70 | channel.write(json.dumps([False, "Unable to reconnect -- sequence number too old."]), seqNo + 1) | |
71 | return | |
72 | ||
73 | if not channel.write(self.old_buffer[1], self.old_buffer[0] + 1): | |
74 | return | |
75 | ||
76 | self.subscriptions.append(channel) | |
77 | self.flush(seqNo) | |
78 | ||
79 | def unsubscribe(self, channel): | |
80 | try: | |
81 | self.subscriptions.remove(channel) | |
82 | except ValueError: | |
83 | pass | |
84 | ||
85 | def timeout(self, channel): | |
86 | if self.schedule: | |
87 | return | |
88 | ||
89 | self.unsubscribe(channel) | |
90 | channel.write(EMPTY_JSON_LIST, self.subSeqNo) | |
91 | ||
92 | def flush(self, scheduled=False): | |
93 | if scheduled: | |
94 | self.schedule = None | |
95 | ||
96 | if not self.buffer or not self.subscriptions: | |
97 | return | |
98 | ||
99 | t = time.time() | |
100 | ||
101 | if t < self.throttle: | |
102 | if not self.schedule: | |
103 | self.schedule = reactor.callLater(self.throttle - t, self.flush, True) | |
104 | return | |
105 | else: | |
106 | # process the rest of the packet | |
107 | if not scheduled: | |
108 | if not self.schedule: | |
109 | self.schedule = reactor.callLater(0, self.flush, True) | |
110 | return | |
111 | ||
112 | self.throttle = t + config.UPDATE_FREQ | |
113 | ||
114 | encdata = json.dumps(self.buffer) | |
115 | self.old_buffer = (self.subSeqNo, encdata) | |
116 | self.subSeqNo+=1 | |
117 | self.buffer = [] | |
118 | self.buflen = 0 | |
119 | ||
120 | subs = self.subscriptions | |
121 | self.subscriptions = newsubs = [] | |
122 | ||
123 | for x in subs: | |
124 | if x.write(encdata, self.subSeqNo): | |
125 | newsubs.append(x) | |
126 | ||
127 | if self.closed and not newsubs: | |
128 | cleanupSession(self.id) | |
129 | ||
130 | def event(self, data): | |
131 | newbuflen = self.buflen + len(data) | |
132 | if newbuflen > config.MAXBUFLEN: | |
133 | self.buffer = [] | |
134 | self.client.error("Buffer overflow.") | |
135 | return | |
136 | ||
137 | self.buffer.append(data) | |
138 | self.buflen = newbuflen | |
139 | self.flush() | |
140 | ||
141 | def push(self, data, seq_no=None): | |
142 | if self.closed: | |
143 | return | |
144 | ||
145 | if len(data) > config.MAXLINELEN: | |
146 | raise LineTooLongException | |
147 | ||
148 | if seq_no is not None: | |
149 | if seq_no <= self.pubSeqNo: | |
150 | return | |
151 | self.pubSeqNo = seq_no | |
152 | self.client.write(data) | |
153 | ||
154 | def disconnect(self): | |
155 | # keep the session hanging around for a few seconds so the | |
156 | # client has a chance to see what the issue was | |
157 | self.closed = True | |
158 | ||
159 | reactor.callLater(5, cleanupSession, self.id) | |
160 | ||
161 | # DANGER! Breach of encapsulation! | |
162 | def connect_notice(line): | |
163 | return "c", "NOTICE", "", ("AUTH", "*** (qwebirc) %s" % line) | |
164 | ||
165 | class RequestChannel(object): | |
166 | def __init__(self, request): | |
167 | self.request = request | |
168 | ||
169 | def write(self, data, seqNo): | |
170 | self.request.setHeader("n", str(seqNo)) | |
171 | self.request.write(data) | |
172 | self.request.finish() | |
173 | return False | |
174 | ||
175 | def close(self): | |
176 | self.request.finish() | |
177 | ||
178 | class AJAXEngine(resource.Resource): | |
179 | isLeaf = True | |
180 | ||
181 | def __init__(self, prefix): | |
182 | self.prefix = prefix | |
183 | self.__connect_hit = HitCounter() | |
184 | self.__total_hit = HitCounter() | |
185 | ||
186 | def render_POST(self, request): | |
187 | path = request.path[len(self.prefix):] | |
188 | if path[0] == "/": | |
189 | handler = self.COMMANDS.get(path[1:]) | |
190 | if handler is not None: | |
191 | try: | |
192 | return handler(self, request) | |
193 | except AJAXException, e: | |
194 | return json.dumps((False, e[0])) | |
195 | ||
196 | return "404" ## TODO: tidy up | |
197 | ||
198 | def newConnection(self, request): | |
199 | ticket = login_optional(request) | |
200 | ||
201 | ip = request.getClientIP() | |
202 | ||
203 | nick = request.args.get("nick") | |
204 | if not nick: | |
205 | raise AJAXException, "Nickname not supplied." | |
206 | nick = ircclient.irc_decode(nick[0]) | |
207 | ||
208 | password = request.args.get("password") | |
209 | if password is not None: | |
210 | password = ircclient.irc_decode(password[0]) | |
211 | ||
212 | for i in range(10): | |
213 | id = get_session_id() | |
214 | if not Sessions.get(id): | |
215 | break | |
216 | else: | |
217 | raise IDGenerationException() | |
218 | ||
219 | session = IRCSession(id) | |
220 | ||
221 | qticket = getSessionData(request).get("qticket") | |
222 | if qticket is None: | |
223 | perform = None | |
224 | else: | |
225 | service_mask = config.AUTH_SERVICE | |
226 | msg_mask = service_mask.split("!")[0] + "@" + service_mask.split("@", 1)[1] | |
227 | perform = ["PRIVMSG %s :TICKETAUTH %s" % (msg_mask, qticket)] | |
228 | ||
229 | ident, realname = config.IDENT, config.REALNAME | |
230 | if ident is config_options.IDENT_HEX or ident is None: # latter is legacy | |
231 | ident = socket.inet_aton(ip).encode("hex") | |
232 | elif ident is config_options.IDENT_NICKNAME: | |
233 | ident = nick | |
234 | ||
235 | self.__connect_hit() | |
236 | ||
237 | def proceed(hostname): | |
238 | kwargs = dict(nick=nick, ident=ident, ip=ip, realname=realname, perform=perform, hostname=hostname) | |
239 | if password is not None: | |
240 | kwargs["password"] = password | |
241 | ||
242 | client = ircclient.createIRC(session, **kwargs) | |
243 | session.client = client | |
244 | ||
245 | if not hasattr(config, "WEBIRC_MODE") or config.WEBIRC_MODE == "hmac": | |
246 | proceed(None) | |
247 | elif config.WEBIRC_MODE != "hmac": | |
248 | notice = lambda x: session.event(connect_notice(x)) | |
249 | notice("Looking up your hostname...") | |
250 | def callback(hostname): | |
251 | notice("Found your hostname.") | |
252 | proceed(hostname) | |
253 | def errback(failure): | |
254 | notice("Couldn't look up your hostname!") | |
255 | proceed(ip) | |
256 | qdns.lookupAndVerifyPTR(ip, timeout=[config.DNS_TIMEOUT]).addCallbacks(callback, errback) | |
257 | ||
258 | Sessions[id] = session | |
259 | ||
260 | return json.dumps((True, id, TRANSPORTS)) | |
261 | ||
262 | def getSession(self, request): | |
263 | bad_session_message = "Invalid session, this most likely means the server has restarted; close this dialog and then try refreshing the page." | |
264 | ||
265 | sessionid = request.args.get("s") | |
266 | if sessionid is None: | |
267 | raise AJAXException, bad_session_message | |
268 | ||
269 | session = Sessions.get(sessionid[0]) | |
270 | if not session: | |
271 | raise AJAXException, bad_session_message | |
272 | return session | |
273 | ||
274 | def subscribe(self, request): | |
275 | request.channel.setTimeout(None) | |
276 | ||
277 | channel = RequestChannel(request) | |
278 | session = self.getSession(request) | |
279 | notifier = request.notifyFinish() | |
280 | ||
281 | seq_no = request.args.get("n") | |
282 | try: | |
283 | if seq_no is not None: | |
284 | seq_no = int(seq_no[0]) | |
285 | if seq_no < 0 or seq_no > MAX_SEQNO: | |
286 | raise ValueError | |
287 | except ValueError: | |
288 | raise AJAXEngine, "Bad sequence number" | |
289 | ||
290 | session.subscribe(channel, seq_no) | |
291 | ||
292 | timeout_entry = reactor.callLater(config.HTTP_AJAX_REQUEST_TIMEOUT, session.timeout, channel) | |
293 | def cancel_timeout(result): | |
294 | try: | |
295 | timeout_entry.cancel() | |
296 | except error.AlreadyCalled: | |
297 | pass | |
298 | session.unsubscribe(channel) | |
299 | notifier.addCallbacks(cancel_timeout, cancel_timeout) | |
300 | return server.NOT_DONE_YET | |
301 | ||
302 | def push(self, request): | |
303 | command = request.args.get("c") | |
304 | if command is None: | |
305 | raise AJAXException, "No command specified." | |
306 | self.__total_hit() | |
307 | ||
308 | seq_no = request.args.get("n") | |
309 | try: | |
310 | if seq_no is not None: | |
311 | seq_no = int(seq_no[0]) | |
312 | if seq_no < 0 or seq_no > MAX_SEQNO: | |
313 | raise ValueError | |
314 | except ValueError: | |
315 | raise AJAXEngine("Bad sequence number %r" % seq_no) | |
316 | ||
317 | session = self.getSession(request) | |
318 | try: | |
319 | session.push(ircclient.irc_decode(command[0]), seq_no) | |
320 | except AttributeError: # occurs when we haven't noticed an error | |
321 | session.disconnect() | |
322 | raise AJAXException, "Connection closed by server; try reconnecting by reloading the page." | |
323 | except Exception, e: # catch all | |
324 | session.disconnect() | |
325 | traceback.print_exc(file=sys.stderr) | |
326 | raise AJAXException, "Unknown error." | |
327 | ||
328 | return json.dumps((True, True)) | |
329 | ||
330 | def closeById(self, k): | |
331 | s = Sessions.get(k) | |
332 | if s is None: | |
333 | return | |
334 | s.client.client.error("Closed by admin interface") | |
335 | ||
336 | @property | |
337 | def adminEngine(self): | |
338 | return { | |
339 | "Sessions": [(str(v.client.client), AdminEngineAction("close", self.closeById, k)) for k, v in Sessions.iteritems() if not v.closed], | |
340 | "Connections": [(self.__connect_hit,)], | |
341 | "Total hits": [(self.__total_hit,)], | |
342 | } | |
343 | ||
344 | COMMANDS = dict(p=push, n=newConnection, s=subscribe) | |
345 | ||
346 | if has_websocket: | |
347 | class WebSocketChannel(object): | |
348 | def __init__(self, channel): | |
349 | self.channel = channel | |
350 | ||
351 | def write(self, data, seqNo): | |
352 | self.channel.send("c", "%d,%s" % (seqNo, data)) | |
353 | return True | |
354 | ||
355 | def close(self): | |
356 | self.channel.close() | |
357 | ||
358 | class WebSocketEngineProtocol(autobahn.websocket.WebSocketServerProtocol): | |
359 | AWAITING_AUTH, AUTHED = 0, 1 | |
360 | ||
361 | def __init__(self, *args, **kwargs): | |
362 | self.__state = self.AWAITING_AUTH | |
363 | self.__session = None | |
364 | self.__channel = None | |
365 | self.__timeout = None | |
366 | ||
367 | def onOpen(self): | |
368 | self.__timeout = reactor.callLater(5, self.close, "Authentication timeout") | |
369 | ||
370 | def onClose(self, wasClean, code, reason): | |
371 | self.__cancelTimeout() | |
372 | if self.__session: | |
373 | self.__session.unsubscribe(self.__channel) | |
374 | self.__session = None | |
375 | ||
376 | def onMessage(self, msg, binary): | |
377 | # we don't bother checking the Origin header, as if you can auth then you've been able to pass the browser's | |
378 | # normal origin handling (POSTed the new connection request and managed to get the session id) | |
379 | state = self.__state | |
380 | message_type, message = msg[:1], msg[1:] | |
381 | if state == self.AWAITING_AUTH: | |
382 | if message_type == "s": # subscribe | |
383 | tokens = message.split(",", 1) | |
384 | if len(tokens) != 2: | |
385 | self.close("Bad tokens") | |
386 | return | |
387 | ||
388 | seq_no, message = tokens[0], tokens[1] | |
389 | try: | |
390 | seq_no = int(seq_no) | |
391 | if seq_no < 0 or seq_no > MAX_SEQNO: | |
392 | raise ValueError | |
393 | except ValueError: | |
394 | self.close("Bad value") | |
395 | ||
396 | session = Sessions.get(message) | |
397 | if not session: | |
398 | self.close(BAD_SESSION_MESSAGE) | |
399 | return | |
400 | ||
401 | self.__cancelTimeout() | |
402 | self.__session = session | |
403 | self.send("s", "True") | |
404 | self.__state = self.AUTHED | |
405 | self.__channel = WebSocketChannel(self) | |
406 | session.subscribe(self.__channel, seq_no) | |
407 | return | |
408 | elif state == self.AUTHED: | |
409 | if message_type == "p": # push | |
410 | tokens = message.split(",", 1) | |
411 | if len(tokens) != 2: | |
412 | self.close("Bad tokens") | |
413 | return | |
414 | ||
415 | seq_no, message = tokens[0], tokens[1] | |
416 | try: | |
417 | seq_no = int(seq_no) | |
418 | if seq_no < 0 or seq_no > MAX_SEQNO: | |
419 | raise ValueError | |
420 | except ValueError: | |
421 | self.close("Bad value") | |
422 | self.__session.push(ircclient.irc_decode(message)) | |
423 | return | |
424 | ||
425 | self.close("Bad message type") | |
426 | ||
427 | def __cancelTimeout(self): | |
428 | if self.__timeout is not None: | |
429 | try: | |
430 | self.__timeout.cancel() | |
431 | except error.AlreadyCalled: | |
432 | pass | |
433 | self.__timeout = None | |
434 | ||
435 | def close(self, reason=None): | |
436 | self.__cancelTimeout() | |
437 | if reason: | |
438 | self.sendClose(4999, reason) | |
439 | else: | |
440 | self.sendClose(4998) | |
441 | ||
442 | if self.__session: | |
443 | self.__session.unsubscribe(self.__channel) | |
444 | self.__session = None | |
445 | ||
446 | def send(self, message_type, message): | |
447 | self.sendMessage(message_type + message) | |
448 | ||
449 | class WebSocketResource(autobahn.resource.WebSocketResource): | |
450 | def render(self, request): | |
451 | request.channel.setTimeout(None) | |
452 | return autobahn.resource.WebSocketResource.render(self, request) | |
453 | ||
454 | def WebSocketEngine(path=None): | |
455 | parsed = urlparse.urlparse(config.BASE_URL) | |
456 | port = parsed.port | |
457 | if port is None: | |
458 | if parsed.scheme == "http": | |
459 | port = 80 | |
460 | elif parsed.scheme == "https": | |
461 | port = 443 | |
462 | else: | |
463 | raise Exception("Unable to determine port from BASE_URL: " + config.BASE_URL) | |
464 | ||
465 | factory = autobahn.websocket.WebSocketServerFactory("ws://localhost:%d" % port) | |
466 | factory.protocol = WebSocketEngineProtocol | |
467 | factory.setProtocolOptions(maxMessagePayloadSize=512, maxFramePayloadSize=512, tcpNoDelay=False) | |
468 | resource = WebSocketResource(factory) | |
469 | return resource |